fix(cloudwatch): allow " in regex patterns (#1943)

This commit is contained in:
Sergio Garcia
2023-02-21 16:46:23 +01:00
committed by GitHub
parent 5ac7cde577
commit 844ad70bb9
30 changed files with 1176 additions and 15 deletions

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_changes_to_network_acls_alarm_configured(Check):
def execute(self):
pattern = r"\$\.eventName\s*=\s*CreateNetworkAcl.+\$\.eventName\s*=\s*CreateNetworkAclEntry.+\$\.eventName\s*=\s*DeleteNetworkAcl.+\$\.eventName\s*=\s*DeleteNetworkAclEntry.+\$\.eventName\s*=\s*ReplaceNetworkAclEntry.+\$\.eventName\s*=\s*ReplaceNetworkAclAssociation"
pattern = r"\$\.eventName\s*=\s*.?CreateNetworkAcl.+\$\.eventName\s*=\s*.?CreateNetworkAclEntry.+\$\.eventName\s*=\s*.?DeleteNetworkAcl.+\$\.eventName\s*=\s*.?DeleteNetworkAclEntry.+\$\.eventName\s*=\s*.?ReplaceNetworkAclEntry.+\$\.eventName\s*=\s*.?ReplaceNetworkAclAssociation.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_changes_to_network_gateways_alarm_configured(Check):
def execute(self):
pattern = r"\$\.eventName\s*=\s*CreateCustomerGateway.+\$\.eventName\s*=\s*DeleteCustomerGateway.+\$\.eventName\s*=\s*AttachInternetGateway.+\$\.eventName\s*=\s*CreateInternetGateway.+\$\.eventName\s*=\s*DeleteInternetGateway.+\$\.eventName\s*=\s*DetachInternetGateway"
pattern = r"\$\.eventName\s*=\s*.?CreateCustomerGateway.+\$\.eventName\s*=\s*.?DeleteCustomerGateway.+\$\.eventName\s*=\s*.?AttachInternetGateway.+\$\.eventName\s*=\s*.?CreateInternetGateway.+\$\.eventName\s*=\s*.?DeleteInternetGateway.+\$\.eventName\s*=\s*.?DetachInternetGateway.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_changes_to_network_route_tables_alarm_configured(Check):
def execute(self):
pattern = r"\$\.eventName\s*=\s*CreateRoute.+\$\.eventName\s*=\s*CreateRouteTable.+\$\.eventName\s*=\s*ReplaceRoute.+\$\.eventName\s*=\s*ReplaceRouteTableAssociation.+\$\.eventName\s*=\s*DeleteRouteTable.+\$\.eventName\s*=\s*DeleteRoute.+\$\.eventName\s*=\s*DisassociateRouteTable"
pattern = r"\$\.eventName\s*=\s*.?CreateRoute.+\$\.eventName\s*=\s*.?CreateRouteTable.+\$\.eventName\s*=\s*.?ReplaceRoute.+\$\.eventName\s*=\s*.?ReplaceRouteTableAssociation.+\$\.eventName\s*=\s*.?DeleteRouteTable.+\$\.eventName\s*=\s*.?DeleteRoute.+\$\.eventName\s*=\s*.?DisassociateRouteTable.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_changes_to_vpcs_alarm_configured(Check):
def execute(self):
pattern = r"\$\.eventName\s*=\s*CreateVpc.+\$\.eventName\s*=\s*DeleteVpc.+\$\.eventName\s*=\s*ModifyVpcAttribute.+\$\.eventName\s*=\s*AcceptVpcPeeringConnection.+\$\.eventName\s*=\s*CreateVpcPeeringConnection.+\$\.eventName\s*=\s*DeleteVpcPeeringConnection.+\$\.eventName\s*=\s*RejectVpcPeeringConnection.+\$\.eventName\s*=\s*AttachClassicLinkVpc.+\$\.eventName\s*=\s*DetachClassicLinkVpc.+\$\.eventName\s*=\s*DisableVpcClassicLink.+\$\.eventName\s*=\s*EnableVpcClassicLink"
pattern = r"\$\.eventName\s*=\s*.?CreateVpc.+\$\.eventName\s*=\s*.?DeleteVpc.+\$\.eventName\s*=\s*.?ModifyVpcAttribute.+\$\.eventName\s*=\s*.?AcceptVpcPeeringConnection.+\$\.eventName\s*=\s*.?CreateVpcPeeringConnection.+\$\.eventName\s*=\s*.?DeleteVpcPeeringConnection.+\$\.eventName\s*=\s*.?RejectVpcPeeringConnection.+\$\.eventName\s*=\s*.?AttachClassicLinkVpc.+\$\.eventName\s*=\s*.?DetachClassicLinkVpc.+\$\.eventName\s*=\s*.?DisableVpcClassicLink.+\$\.eventName\s*=\s*.?EnableVpcClassicLink.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -14,7 +14,7 @@ class cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_change
Check
):
def execute(self):
pattern = r"\$\.eventSource\s*=\s*config.amazonaws.com.+\$\.eventName\s*=\s*StopConfigurationRecorder.+\$\.eventName\s*=\s*DeleteDeliveryChannel.+\$\.eventName\s*=\s*PutDeliveryChannel.+\$\.eventName\s*=\s*PutConfigurationRecorder"
pattern = r"\$\.eventSource\s*=\s*.?config.amazonaws.com.+\$\.eventName\s*=\s*.?StopConfigurationRecorder.+\$\.eventName\s*=\s*.?DeleteDeliveryChannel.+\$\.eventName\s*=\s*.?PutDeliveryChannel.+\$\.eventName\s*=\s*.?PutConfigurationRecorder.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -14,7 +14,7 @@ class cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_change
Check
):
def execute(self):
pattern = r"\$\.eventName\s*=\s*CreateTrail.+\$\.eventName\s*=\s*UpdateTrail.+\$\.eventName\s*=\s*DeleteTrail.+\$\.eventName\s*=\s*StartLogging.+\$\.eventName\s*=\s*StopLogging"
pattern = r"\$\.eventName\s*=\s*.?CreateTrail.+\$\.eventName\s*=\s*.?UpdateTrail.+\$\.eventName\s*=\s*.?DeleteTrail.+\$\.eventName\s*=\s*.?StartLogging.+\$\.eventName\s*=\s*.?StopLogging.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_metric_filter_authentication_failures(Check):
def execute(self):
pattern = r"\$\.eventName\s*=\s*ConsoleLogin.+\$\.errorMessage\s*=\s*Failed authentication"
pattern = r"\$\.eventName\s*=\s*.?ConsoleLogin.+\$\.errorMessage\s*=\s*.?Failed authentication.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_metric_filter_aws_organizations_changes(Check):
def execute(self):
pattern = r"\$\.eventSource\s*=\s*organizations\.amazonaws\.com.+\$\.eventName\s*=\s*AcceptHandshake.+\$\.eventName\s*=\s*AttachPolicy.+\$\.eventName\s*=\s*CancelHandshake.+\$\.eventName\s*=\s*CreateAccount.+\$\.eventName\s*=\s*CreateOrganization.+\$\.eventName\s*=\s*CreateOrganizationalUnit.+\$\.eventName\s*=\s*CreatePolicy.+\$\.eventName\s*=\s*DeclineHandshake.+\$\.eventName\s*=\s*DeleteOrganization.+\$\.eventName\s*=\s*DeleteOrganizationalUnit.+\$\.eventName\s*=\s*DeletePolicy.+\$\.eventName\s*=\s*EnableAllFeatures.+\$\.eventName\s*=\s*EnablePolicyType.+\$\.eventName\s*=\s*InviteAccountToOrganization.+\$\.eventName\s*=\s*LeaveOrganization.+\$\.eventName\s*=\s*DetachPolicy.+\$\.eventName\s*=\s*DisablePolicyType.+\$\.eventName\s*=\s*MoveAccount.+\$\.eventName\s*=\s*RemoveAccountFromOrganization.+\$\.eventName\s*=\s*UpdateOrganizationalUnit.+\$\.eventName\s*=\s*UpdatePolicy"
pattern = r"\$\.eventSource\s*=\s*.?organizations\.amazonaws\.com.+\$\.eventName\s*=\s*.?AcceptHandshake.+\$\.eventName\s*=\s*.?AttachPolicy.+\$\.eventName\s*=\s*.?CancelHandshake.+\$\.eventName\s*=\s*.?CreateAccount.+\$\.eventName\s*=\s*.?CreateOrganization.+\$\.eventName\s*=\s*.?CreateOrganizationalUnit.+\$\.eventName\s*=\s*.?CreatePolicy.+\$\.eventName\s*=\s*.?DeclineHandshake.+\$\.eventName\s*=\s*.?DeleteOrganization.+\$\.eventName\s*=\s*.?DeleteOrganizationalUnit.+\$\.eventName\s*=\s*.?DeletePolicy.+\$\.eventName\s*=\s*.?EnableAllFeatures.+\$\.eventName\s*=\s*.?EnablePolicyType.+\$\.eventName\s*=\s*.?InviteAccountToOrganization.+\$\.eventName\s*=\s*.?LeaveOrganization.+\$\.eventName\s*=\s*.?DetachPolicy.+\$\.eventName\s*=\s*.?DisablePolicyType.+\$\.eventName\s*=\s*.?MoveAccount.+\$\.eventName\s*=\s*.?RemoveAccountFromOrganization.+\$\.eventName\s*=\s*.?UpdateOrganizationalUnit.+\$\.eventName\s*=\s*.?UpdatePolicy.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk(Check):
def execute(self):
pattern = r"\$\.eventSource\s*=\s*kms.amazonaws.com.+\$\.eventName\s*=\s*DisableKey.+\$\.eventName\s*=\s*ScheduleKeyDeletion"
pattern = r"\$\.eventSource\s*=\s*.?kms.amazonaws.com.+\$\.eventName\s*=\s*.?DisableKey.+\$\.eventName\s*=\s*.?ScheduleKeyDeletion.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_metric_filter_for_s3_bucket_policy_changes(Check):
def execute(self):
pattern = r"\$\.eventSource\s*=\s*s3.amazonaws.com.+\$\.eventName\s*=\s*PutBucketAcl.+\$\.eventName\s*=\s*PutBucketPolicy.+\$\.eventName\s*=\s*PutBucketCors.+\$\.eventName\s*=\s*PutBucketLifecycle.+\$\.eventName\s*=\s*PutBucketReplication.+\$\.eventName\s*=\s*DeleteBucketPolicy.+\$\.eventName\s*=\s*DeleteBucketCors.+\$\.eventName\s*=\s*DeleteBucketLifecycle.+\$\.eventName\s*=\s*DeleteBucketReplication"
pattern = r"\$\.eventSource\s*=\s*.?s3.amazonaws.com.+\$\.eventName\s*=\s*.?PutBucketAcl.+\$\.eventName\s*=\s*.?PutBucketPolicy.+\$\.eventName\s*=\s*.?PutBucketCors.+\$\.eventName\s*=\s*.?PutBucketLifecycle.+\$\.eventName\s*=\s*.?PutBucketReplication.+\$\.eventName\s*=\s*.?DeleteBucketPolicy.+\$\.eventName\s*=\s*.?DeleteBucketCors.+\$\.eventName\s*=\s*.?DeleteBucketLifecycle.+\$\.eventName\s*=\s*.?DeleteBucketReplication.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_metric_filter_policy_changes(Check):
def execute(self):
pattern = r"\$\.eventName\s*=\s*DeleteGroupPolicy.+\$\.eventName\s*=\s*DeleteRolePolicy.+\$\.eventName\s*=\s*DeleteUserPolicy.+\$\.eventName\s*=\s*PutGroupPolicy.+\$\.eventName\s*=\s*PutRolePolicy.+\$\.eventName\s*=\s*PutUserPolicy.+\$\.eventName\s*=\s*CreatePolicy.+\$\.eventName\s*=\s*DeletePolicy.+\$\.eventName\s*=\s*CreatePolicyVersion.+\$\.eventName\s*=\s*DeletePolicyVersion.+\$\.eventName\s*=\s*AttachRolePolicy.+\$\.eventName\s*=\s*DetachRolePolicy.+\$\.eventName\s*=\s*AttachUserPolicy.+\$\.eventName\s*=\s*DetachUserPolicy.+\$\.eventName\s*=\s*AttachGroupPolicy.+\$\.eventName\s*=\s*DetachGroupPolicy"
pattern = r"\$\.eventName\s*=\s*.?DeleteGroupPolicy.+\$\.eventName\s*=\s*.?DeleteRolePolicy.+\$\.eventName\s*=\s*.?DeleteUserPolicy.+\$\.eventName\s*=\s*.?PutGroupPolicy.+\$\.eventName\s*=\s*.?PutRolePolicy.+\$\.eventName\s*=\s*.?PutUserPolicy.+\$\.eventName\s*=\s*.?CreatePolicy.+\$\.eventName\s*=\s*.?DeletePolicy.+\$\.eventName\s*=\s*.?CreatePolicyVersion.+\$\.eventName\s*=\s*.?DeletePolicyVersion.+\$\.eventName\s*=\s*.?AttachRolePolicy.+\$\.eventName\s*=\s*.?DetachRolePolicy.+\$\.eventName\s*=\s*.?AttachUserPolicy.+\$\.eventName\s*=\s*.?DetachUserPolicy.+\$\.eventName\s*=\s*.?AttachGroupPolicy.+\$\.eventName\s*=\s*.?DetachGroupPolicy.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_metric_filter_root_usage(Check):
def execute(self):
pattern = r"\$\.userIdentity\.type\s*=\s*Root.+\$\.userIdentity\.invokedBy NOT EXISTS.+\$\.eventType\s*!=\s*AwsServiceEvent"
pattern = r"\$\.userIdentity\.type\s*=\s*.?Root.+\$\.userIdentity\.invokedBy NOT EXISTS.+\$\.eventType\s*!=\s*.?AwsServiceEvent.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_metric_filter_security_group_changes(Check):
def execute(self):
pattern = r"\$\.eventName\s*=\s*AuthorizeSecurityGroupIngress.+\$\.eventName\s*=\s*AuthorizeSecurityGroupEgress.+\$\.eventName\s*=\s*RevokeSecurityGroupIngress.+\$\.eventName\s*=\s*RevokeSecurityGroupEgress.+\$\.eventName\s*=\s*CreateSecurityGroup.+\$\.eventName\s*=\s*DeleteSecurityGroup"
pattern = r"\$\.eventName\s*=\s*.?AuthorizeSecurityGroupIngress.+\$\.eventName\s*=\s*.?AuthorizeSecurityGroupEgress.+\$\.eventName\s*=\s*.?RevokeSecurityGroupIngress.+\$\.eventName\s*=\s*.?RevokeSecurityGroupEgress.+\$\.eventName\s*=\s*.?CreateSecurityGroup.+\$\.eventName\s*=\s*.?DeleteSecurityGroup.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_metric_filter_sign_in_without_mfa(Check):
def execute(self):
pattern = r"\$\.eventName\s*=\s*ConsoleLogin.+\$\.additionalEventData\.MFAUsed\s*!=\s*Yes"
pattern = r"\$\.eventName\s*=\s*.?ConsoleLogin.+\$\.additionalEventData\.MFAUsed\s*!=\s*.?Yes.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -12,7 +12,7 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_metric_filter_unauthorized_api_calls(Check):
def execute(self):
pattern = r"\$\.errorCode\s*=\s*\*UnauthorizedOperation.+\$\.errorCode\s*=\s*AccessDenied\*"
pattern = r"\$\.errorCode\s*=\s*.?\*UnauthorizedOperation.+\$\.errorCode\s*=\s*.?AccessDenied\*.?"
findings = []
report = Check_Report_AWS(self.metadata())
report.status = "FAIL"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ ($.eventName = "CreateNetworkAcl") || ($.eventName = "CreateNetworkAclEntry") || ($.eventName = "DeleteNetworkAcl") || ($.eventName = "DeleteNetworkAclEntry") || ($.eventName = "ReplaceNetworkAclEntry") || ($.eventName = "ReplaceNetworkAclAssociation") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_acls_alarm_configured.cloudwatch_changes_to_network_acls_alarm_configured.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_acls_alarm_configured.cloudwatch_changes_to_network_acls_alarm_configured.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_acls_alarm_configured.cloudwatch_changes_to_network_acls_alarm_configured.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_acls_alarm_configured.cloudwatch_changes_to_network_acls_alarm_configured import (
cloudwatch_changes_to_network_acls_alarm_configured,
)
check = cloudwatch_changes_to_network_acls_alarm_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName = "CreateCustomerGateway") || ($.eventName = "DeleteCustomerGateway") || ($.eventName = "AttachInternetGateway") || ($.eventName = "CreateInternetGateway") || ($.eventName = "DeleteInternetGateway") || ($.eventName = "DetachInternetGateway") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_gateways_alarm_configured.cloudwatch_changes_to_network_gateways_alarm_configured.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_gateways_alarm_configured.cloudwatch_changes_to_network_gateways_alarm_configured.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_gateways_alarm_configured.cloudwatch_changes_to_network_gateways_alarm_configured.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_gateways_alarm_configured.cloudwatch_changes_to_network_gateways_alarm_configured import (
cloudwatch_changes_to_network_gateways_alarm_configured,
)
check = cloudwatch_changes_to_network_gateways_alarm_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName = "CreateRoute") || ($.eventName = "CreateRouteTable") || ($.eventName = "ReplaceRoute") || ($.eventName = "ReplaceRouteTableAssociation")|| ($.eventName = "DeleteRouteTable") || ($.eventName = "DeleteRoute") || ($.eventName = "DisassociateRouteTable") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudwatch_changes_to_network_route_tables_alarm_configured.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_network_route_tables_alarm_configured.cloudwatch_changes_to_network_route_tables_alarm_configured import (
cloudwatch_changes_to_network_route_tables_alarm_configured,
)
check = cloudwatch_changes_to_network_route_tables_alarm_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ ($.eventName = "CreateVpc") || ($.eventName = "DeleteVpc") || ($.eventName = "ModifyVpcAttribute") || ($.eventName = "AcceptVpcPeeringConnection") || ($.eventName = "CreateVpcPeeringConnection") || ($.eventName = "DeleteVpcPeeringConnection") || ($.eventName = "RejectVpcPeeringConnection") || ($.eventName = "AttachClassicLinkVpc") || ($.eventName = "DetachClassicLinkVpc") || ($.eventName = "DisableVpcClassicLink") || ($.eventName = "EnableVpcClassicLink") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_vpcs_alarm_configured.cloudwatch_changes_to_vpcs_alarm_configured.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_vpcs_alarm_configured.cloudwatch_changes_to_vpcs_alarm_configured.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_vpcs_alarm_configured.cloudwatch_changes_to_vpcs_alarm_configured.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_changes_to_vpcs_alarm_configured.cloudwatch_changes_to_vpcs_alarm_configured import (
cloudwatch_changes_to_vpcs_alarm_configured,
)
check = cloudwatch_changes_to_vpcs_alarm_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -300,3 +300,82 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventSource = "config.amazonaws.com") && (($.eventName="StopConfigurationRecorder")||($.eventName="DeleteDeliveryChannel")|| ($.eventName="PutDeliveryChannel")||($.eventName="PutConfigurationRecorder"))}',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled import (
cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled,
)
check = (
cloudwatch_log_metric_filter_and_alarm_for_aws_config_configuration_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -300,3 +300,82 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName = "CreateTrail") || ($.eventName = "UpdateTrail") || ($.eventName = "DeleteTrail") || ($.eventName = "StartLogging") || ($.eventName = "StopLogging")}',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled.cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled import (
cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled,
)
check = (
cloudwatch_log_metric_filter_and_alarm_for_cloudtrail_configuration_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName = "ConsoleLogin") && ($.errorMessage = "Failed authentication")}',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_authentication_failures.cloudwatch_log_metric_filter_authentication_failures.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_authentication_failures.cloudwatch_log_metric_filter_authentication_failures.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_authentication_failures.cloudwatch_log_metric_filter_authentication_failures.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_authentication_failures.cloudwatch_log_metric_filter_authentication_failures import (
cloudwatch_log_metric_filter_authentication_failures,
)
check = cloudwatch_log_metric_filter_authentication_failures()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_aws_organizations_changes:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ ($.eventSource = "organizations.amazonaws.com") && ($.eventName = "AcceptHandshake") || ($.eventName = "AttachPolicy") || ($.eventName = "CancelHandshake") || ($.eventName = "CreateAccount") || ($.eventName = "CreateOrganization") || ($.eventName = "CreateOrganizationalUnit") || ($.eventName = "CreatePolicy") || ($.eventName = "DeclineHandshake") || ($.eventName = "DeleteOrganization") || ($.eventName = "DeleteOrganizationalUnit") || ($.eventName = "DeletePolicy") || ($.eventName = "EnableAllFeatures") || ($.eventName = "EnablePolicyType") || ($.eventName = "InviteAccountToOrganization") || ($.eventName = "LeaveOrganization") || ($.eventName = "DetachPolicy") || ($.eventName = "DisablePolicyType") || ($.eventName = "MoveAccount") || ($.eventName = "RemoveAccountFromOrganization") || ($.eventName = "UpdateOrganizationalUnit") || ($.eventName = "UpdatePolicy") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_aws_organizations_changes.cloudwatch_log_metric_filter_aws_organizations_changes import (
cloudwatch_log_metric_filter_aws_organizations_changes,
)
check = cloudwatch_log_metric_filter_aws_organizations_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -300,3 +300,82 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventSource = "kms.amazonaws.com") &&(($.eventName="DisableKey")||($.eventName="ScheduleKeyDeletion")) }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk.cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk import (
cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk,
)
check = (
cloudwatch_log_metric_filter_disable_or_scheduled_deletion_of_kms_cmk()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventSource = "s3.amazonaws.com") && (($.eventName = "PutBucketAcl") || ($.eventName = "PutBucketPolicy") || ($.eventName = "PutBucketCors") || ($.eventName = "PutBucketLifecycle") || ($.eventName = "PutBucketReplication") || ($.eventName = "DeleteBucketPolicy") || ($.eventName = "DeleteBucketCors") || ($.eventName = "DeleteBucketLifecycle") || ($.eventName = "DeleteBucketReplication")) }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes.cloudwatch_log_metric_filter_for_s3_bucket_policy_changes import (
cloudwatch_log_metric_filter_for_s3_bucket_policy_changes,
)
check = cloudwatch_log_metric_filter_for_s3_bucket_policy_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName="DeleteGroupPolicy")||($.eventName="DeleteRolePolicy")||($.eventName="DeleteUserPolicy")||($.eventName="PutGroupPolicy")||($.eventName="PutRolePolicy")||($.eventName="PutUserPolicy")||($.eventName="CreatePolicy")||($.eventName="DeletePolicy")||($.eventName="CreatePolicyVersion")||($.eventName="DeletePolicyVersion")||($.eventName="AttachRolePolicy")||($.eventName="DetachRolePolicy")||($.eventName="AttachUserPolicy")||($.eventName="DetachUserPolicy")||($.eventName="AttachGroupPolicy")||($.eventName="DetachGroupPolicy")}',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_policy_changes.cloudwatch_log_metric_filter_policy_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_policy_changes.cloudwatch_log_metric_filter_policy_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_policy_changes.cloudwatch_log_metric_filter_policy_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_policy_changes.cloudwatch_log_metric_filter_policy_changes import (
cloudwatch_log_metric_filter_policy_changes,
)
check = cloudwatch_log_metric_filter_policy_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ $.userIdentity.type = "Root" && $.userIdentity.invokedBy NOT EXISTS && $.eventType != "AwsServiceEvent" }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_root_usage.cloudwatch_log_metric_filter_root_usage.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_root_usage.cloudwatch_log_metric_filter_root_usage.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_root_usage.cloudwatch_log_metric_filter_root_usage.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_root_usage.cloudwatch_log_metric_filter_root_usage import (
cloudwatch_log_metric_filter_root_usage,
)
check = cloudwatch_log_metric_filter_root_usage()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{($.eventName = "AuthorizeSecurityGroupIngress") || ($.eventName = "AuthorizeSecurityGroupEgress") || ($.eventName = "RevokeSecurityGroupIngress") || ($.eventName = "RevokeSecurityGroupEgress") || ($.eventName = "CreateSecurityGroup") || ($.eventName = "DeleteSecurityGroup") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_security_group_changes.cloudwatch_log_metric_filter_security_group_changes.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_security_group_changes.cloudwatch_log_metric_filter_security_group_changes.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_security_group_changes.cloudwatch_log_metric_filter_security_group_changes.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_security_group_changes.cloudwatch_log_metric_filter_security_group_changes import (
cloudwatch_log_metric_filter_security_group_changes,
)
check = cloudwatch_log_metric_filter_security_group_changes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ ($.eventName = "ConsoleLogin") && ($.additionalEventData.MFAUsed != "Yes") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudwatch_log_metric_filter_sign_in_without_mfa.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_sign_in_without_mfa.cloudwatch_log_metric_filter_sign_in_without_mfa import (
cloudwatch_log_metric_filter_sign_in_without_mfa,
)
check = cloudwatch_log_metric_filter_sign_in_without_mfa()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"

View File

@@ -290,3 +290,80 @@ class Test_cloudwatch_log_metric_filter_unauthorized_api_calls:
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"
@mock_logs
@mock_cloudtrail
@mock_cloudwatch
@mock_s3
def test_cloudwatch_trail_with_log_group_with_metric_and_alarm_with_quotes(self):
cloudtrail_client = client("cloudtrail", region_name=AWS_REGION)
cloudwatch_client = client("cloudwatch", region_name=AWS_REGION)
logs_client = client("logs", region_name=AWS_REGION)
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
logs_client.create_log_group(logGroupName="/log-group/test")
cloudtrail_client.create_trail(
Name="test_trail",
S3BucketName="test",
CloudWatchLogsLogGroupArn=f"arn:aws:logs:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:log-group:/log-group/test:*",
)
logs_client.put_metric_filter(
logGroupName="/log-group/test",
filterName="test-filter",
filterPattern='{ ($.errorCode = "*UnauthorizedOperation") || ($.errorCode = "AccessDenied*") || ($.sourceIPAddress!="delivery.logs.amazonaws.com") || ($.eventName!="HeadBucket") }',
metricTransformations=[
{
"metricName": "my-metric",
"metricNamespace": "my-namespace",
"metricValue": "$.value",
}
],
)
cloudwatch_client.put_metric_alarm(
AlarmName="test-alarm",
MetricName="my-metric",
Namespace="my-namespace",
Period=10,
EvaluationPeriods=5,
Statistic="Average",
Threshold=2,
ComparisonOperator="GreaterThanThreshold",
ActionsEnabled=True,
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
Logs,
)
current_audit_info.audited_partition = "aws"
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
Cloudtrail,
)
with mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudwatch_log_metric_filter_unauthorized_api_calls.logs_client",
new=Logs(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudwatch_client",
new=CloudWatch(current_audit_info),
), mock.patch(
"prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudwatch.cloudwatch_log_metric_filter_unauthorized_api_calls.cloudwatch_log_metric_filter_unauthorized_api_calls import (
cloudwatch_log_metric_filter_unauthorized_api_calls,
)
check = cloudwatch_log_metric_filter_unauthorized_api_calls()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "CloudWatch log group /log-group/test found with metric filter test-filter and alarms set."
)
assert result[0].resource_id == "/log-group/test"