From 7b5b14dbd029d716abe5e22158a103f09a143f1a Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Mon, 11 Dec 2023 11:23:24 +0100 Subject: [PATCH] refactor(cloudwatch): simplify logic (#3172) --- ...hanges_to_network_acls_alarm_configured.py | 32 ++++++----------- ...es_to_network_gateways_alarm_configured.py | 32 ++++++----------- ...o_network_route_tables_alarm_configured.py | 32 ++++++----------- ...dwatch_changes_to_vpcs_alarm_configured.py | 32 ++++++----------- ...ws_config_configuration_changes_enabled.py | 32 ++++++----------- ...loudtrail_configuration_changes_enabled.py | 32 ++++++----------- ...g_metric_filter_authentication_failures.py | 32 ++++++----------- ...metric_filter_aws_organizations_changes.py | 32 ++++++----------- ...isable_or_scheduled_deletion_of_kms_cmk.py | 32 ++++++----------- ...ric_filter_for_s3_bucket_policy_changes.py | 33 ++++++------------ ...dwatch_log_metric_filter_policy_changes.py | 32 ++++++----------- ...cloudwatch_log_metric_filter_root_usage.py | 32 ++++++----------- ...og_metric_filter_security_group_changes.py | 32 ++++++----------- ...h_log_metric_filter_sign_in_without_mfa.py | 32 ++++++----------- ...og_metric_filter_unauthorized_api_calls.py | 32 ++++++----------- .../services/cloudwatch/lib/metric_filters.py | 34 +++++++++++++++++++ 16 files changed, 185 insertions(+), 330 deletions(-) create mode 100644 prowler/providers/aws/services/cloudwatch/lib/metric_filters.py diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_acls_alarm_configured/cloudwatch_changes_to_network_acls_alarm_configured.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_acls_alarm_configured/cloudwatch_changes_to_network_acls_alarm_configured.py index ae4f0f37..6bf9e6f3 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_acls_alarm_configured/cloudwatch_changes_to_network_acls_alarm_configured.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_acls_alarm_configured/cloudwatch_changes_to_network_acls_alarm_configured.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_changes_to_network_acls_alarm_configured(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_gateways_alarm_configured/cloudwatch_changes_to_network_gateways_alarm_configured.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_gateways_alarm_configured/cloudwatch_changes_to_network_gateways_alarm_configured.py index c2446125..2ad693d1 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_gateways_alarm_configured/cloudwatch_changes_to_network_gateways_alarm_configured.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_gateways_alarm_configured/cloudwatch_changes_to_network_gateways_alarm_configured.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_changes_to_network_gateways_alarm_configured(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_route_tables_alarm_configured/cloudwatch_changes_to_network_route_tables_alarm_configured.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_route_tables_alarm_configured/cloudwatch_changes_to_network_route_tables_alarm_configured.py index 6d8655d4..51b4c6b0 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_route_tables_alarm_configured/cloudwatch_changes_to_network_route_tables_alarm_configured.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_network_route_tables_alarm_configured/cloudwatch_changes_to_network_route_tables_alarm_configured.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_changes_to_network_route_tables_alarm_configured(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_vpcs_alarm_configured/cloudwatch_changes_to_vpcs_alarm_configured.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_vpcs_alarm_configured/cloudwatch_changes_to_vpcs_alarm_configured.py index e95dacf1..41b20039 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_vpcs_alarm_configured/cloudwatch_changes_to_vpcs_alarm_configured.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_changes_to_vpcs_alarm_configured/cloudwatch_changes_to_vpcs_alarm_configured.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_changes_to_vpcs_alarm_configured(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled/cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled/cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.py index e43e7865..57791fd8 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled/cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled/cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -24,26 +25,13 @@ class cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_change report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled/cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled/cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.py index 78f505b0..90e8fcfa 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled/cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled/cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -24,26 +25,13 @@ class cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_change report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_authentication_failures/cloudwatch_log_metric_filter_authentication_failures.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_authentication_failures/cloudwatch_log_metric_filter_authentication_failures.py index 861950a2..17c627f1 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_authentication_failures/cloudwatch_log_metric_filter_authentication_failures.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_authentication_failures/cloudwatch_log_metric_filter_authentication_failures.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_log_metric_filter_authentication_failures(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_aws_organizations_changes/cloudwatch_log_metric_filter_aws_organizations_changes.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_aws_organizations_changes/cloudwatch_log_metric_filter_aws_organizations_changes.py index 667f35d6..b4a49a90 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_aws_organizations_changes/cloudwatch_log_metric_filter_aws_organizations_changes.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_aws_organizations_changes/cloudwatch_log_metric_filter_aws_organizations_changes.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_log_metric_filter_aws_organizations_changes(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk/cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk/cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.py index 483a9ec2..cb963ca1 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk/cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk/cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk(Chec report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_for_s3_bucket_policy_changes/cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_for_s3_bucket_policy_changes/cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.py index 4ed80240..1e80b5d2 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_for_s3_bucket_policy_changes/cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_for_s3_bucket_policy_changes/cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,14 @@ class cloudwatch_log_metric_filter_for_s3_bucket_policy_changes(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_policy_changes/cloudwatch_log_metric_filter_policy_changes.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_policy_changes/cloudwatch_log_metric_filter_policy_changes.py index dbfb9a06..b33e9425 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_policy_changes/cloudwatch_log_metric_filter_policy_changes.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_policy_changes/cloudwatch_log_metric_filter_policy_changes.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_log_metric_filter_policy_changes(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_root_usage/cloudwatch_log_metric_filter_root_usage.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_root_usage/cloudwatch_log_metric_filter_root_usage.py index 36f92790..4c78c4de 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_root_usage/cloudwatch_log_metric_filter_root_usage.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_root_usage/cloudwatch_log_metric_filter_root_usage.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_log_metric_filter_root_usage(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_security_group_changes/cloudwatch_log_metric_filter_security_group_changes.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_security_group_changes/cloudwatch_log_metric_filter_security_group_changes.py index 12b15696..e87275a5 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_security_group_changes/cloudwatch_log_metric_filter_security_group_changes.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_security_group_changes/cloudwatch_log_metric_filter_security_group_changes.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_log_metric_filter_security_group_changes(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_sign_in_without_mfa/cloudwatch_log_metric_filter_sign_in_without_mfa.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_sign_in_without_mfa/cloudwatch_log_metric_filter_sign_in_without_mfa.py index 3ac92822..bfa69df3 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_sign_in_without_mfa/cloudwatch_log_metric_filter_sign_in_without_mfa.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_sign_in_without_mfa/cloudwatch_log_metric_filter_sign_in_without_mfa.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_log_metric_filter_sign_in_without_mfa(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_unauthorized_api_calls/cloudwatch_log_metric_filter_unauthorized_api_calls.py b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_unauthorized_api_calls/cloudwatch_log_metric_filter_unauthorized_api_calls.py index 7e8200bc..a9a96fa7 100644 --- a/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_unauthorized_api_calls/cloudwatch_log_metric_filter_unauthorized_api_calls.py +++ b/prowler/providers/aws/services/cloudwatch/cloudwatch_log_metric_filter_unauthorized_api_calls/cloudwatch_log_metric_filter_unauthorized_api_calls.py @@ -1,5 +1,3 @@ -import re - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( cloudtrail_client, @@ -7,6 +5,9 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import ( from prowler.providers.aws.services.cloudwatch.cloudwatch_client import ( cloudwatch_client, ) +from prowler.providers.aws.services.cloudwatch.lib.metric_filters import ( + check_cloudwatch_log_metric_filter, +) from prowler.providers.aws.services.cloudwatch.logs_client import logs_client @@ -22,26 +23,13 @@ class cloudwatch_log_metric_filter_unauthorized_api_calls(Check): report.region = cloudwatch_client.region report.resource_id = cloudtrail_client.audited_account report.resource_arn = cloudtrail_client.audited_account_arn - # 1. Iterate for CloudWatch Log Group in CloudTrail trails - log_groups = [] - for trail in cloudtrail_client.trails: - if trail.log_group_arn: - log_groups.append(trail.log_group_arn.split(":")[6]) - # 2. Describe metric filters for previous log groups - for metric_filter in logs_client.metric_filters: - if metric_filter.log_group in log_groups: - if re.search(pattern, metric_filter.pattern, flags=re.DOTALL): - report.resource_id = metric_filter.log_group - report.resource_arn = metric_filter.arn - report.region = metric_filter.region - report.status = "FAIL" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." - # 3. Check if there is an alarm for the metric - for alarm in cloudwatch_client.metric_alarms: - if alarm.metric == metric_filter.metric: - report.status = "PASS" - report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." - break + report = check_cloudwatch_log_metric_filter( + pattern, + cloudtrail_client.trails, + logs_client.metric_filters, + cloudwatch_client.metric_alarms, + report, + ) findings.append(report) return findings diff --git a/prowler/providers/aws/services/cloudwatch/lib/metric_filters.py b/prowler/providers/aws/services/cloudwatch/lib/metric_filters.py new file mode 100644 index 00000000..ea6dcafc --- /dev/null +++ b/prowler/providers/aws/services/cloudwatch/lib/metric_filters.py @@ -0,0 +1,34 @@ +import re + +from prowler.lib.check.models import Check_Report_AWS + + +def check_cloudwatch_log_metric_filter( + metric_filter_pattern: str, + trails: list, + metric_filters: list, + metric_alarms: list, + report: Check_Report_AWS, +): + # 1. Iterate for CloudWatch Log Group in CloudTrail trails + log_groups = [] + for trail in trails: + if trail.log_group_arn: + log_groups.append(trail.log_group_arn.split(":")[6]) + # 2. Describe metric filters for previous log groups + for metric_filter in metric_filters: + if metric_filter.log_group in log_groups: + if re.search(metric_filter_pattern, metric_filter.pattern, flags=re.DOTALL): + report.resource_id = metric_filter.log_group + report.resource_arn = metric_filter.arn + report.region = metric_filter.region + report.status = "FAIL" + report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} but no alarms associated." + # 3. Check if there is an alarm for the metric + for alarm in metric_alarms: + if alarm.metric == metric_filter.metric: + report.status = "PASS" + report.status_extended = f"CloudWatch log group {metric_filter.log_group} found with metric filter {metric_filter.name} and alarms set." + break + + return report