mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-11 07:15:15 +00:00
chore(test): add CloudWatch and Logs tests (#2264)
This commit is contained in:
@@ -1,8 +1,11 @@
|
||||
from boto3 import session
|
||||
from moto import mock_cloudwatch
|
||||
from boto3 import client, session
|
||||
from moto import mock_cloudwatch, mock_logs
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import CloudWatch
|
||||
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
|
||||
CloudWatch,
|
||||
Logs,
|
||||
)
|
||||
from prowler.providers.common.models import Audit_Metadata
|
||||
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
@@ -54,8 +57,8 @@ class Test_CloudWatch_Service:
|
||||
# CloudWatch client for this test class
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
cloudwatch = CloudWatch(audit_info)
|
||||
for client in cloudwatch.regional_clients.values():
|
||||
assert client.__class__.__name__ == "CloudWatch"
|
||||
for client_ in cloudwatch.regional_clients.values():
|
||||
assert client_.__class__.__name__ == "CloudWatch"
|
||||
|
||||
# Test CloudWatch Session
|
||||
@mock_cloudwatch
|
||||
@@ -72,3 +75,127 @@ class Test_CloudWatch_Service:
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
cloudwatch = CloudWatch(audit_info)
|
||||
assert cloudwatch.audited_account == AWS_ACCOUNT_NUMBER
|
||||
|
||||
# Test Logs Service
|
||||
@mock_logs
|
||||
def test_logs_service(self):
|
||||
# Logs client for this test class
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
logs = Logs(audit_info)
|
||||
assert logs.service == "logs"
|
||||
|
||||
# Test Logs Client
|
||||
@mock_logs
|
||||
def test_logs_client(self):
|
||||
# Logs client for this test class
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
logs = Logs(audit_info)
|
||||
for client_ in logs.regional_clients.values():
|
||||
assert client_.__class__.__name__ == "CloudWatchLogs"
|
||||
|
||||
# Test Logs Session
|
||||
@mock_logs
|
||||
def test__logs_get_session__(self):
|
||||
# Logs client for this test class
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
logs = Logs(audit_info)
|
||||
assert logs.session.__class__.__name__ == "Session"
|
||||
|
||||
# Test Logs Session
|
||||
@mock_logs
|
||||
def test_logs_audited_account(self):
|
||||
# Logs client for this test class
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
logs = Logs(audit_info)
|
||||
assert logs.audited_account == AWS_ACCOUNT_NUMBER
|
||||
|
||||
# Test CloudWatch Alarms
|
||||
@mock_cloudwatch
|
||||
def test__describe_alarms__(self):
|
||||
# CloudWatch client for this test class
|
||||
cw_client = client("cloudwatch", region_name=AWS_REGION)
|
||||
cw_client.put_metric_alarm(
|
||||
AlarmActions=["arn:alarm"],
|
||||
AlarmDescription="A test",
|
||||
AlarmName="test",
|
||||
ComparisonOperator="GreaterThanOrEqualToThreshold",
|
||||
Dimensions=[{"Name": "InstanceId", "Value": "i-0123457"}],
|
||||
EvaluationPeriods=5,
|
||||
InsufficientDataActions=["arn:insufficient"],
|
||||
Namespace="test_namespace",
|
||||
MetricName="test_metric",
|
||||
OKActions=["arn:ok"],
|
||||
Period=60,
|
||||
Statistic="Average",
|
||||
Threshold=2,
|
||||
Unit="Seconds",
|
||||
Tags=[{"Key": "key-1", "Value": "value-1"}],
|
||||
)
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
cloudwatch = CloudWatch(audit_info)
|
||||
assert len(cloudwatch.metric_alarms) == 1
|
||||
assert (
|
||||
cloudwatch.metric_alarms[0].arn
|
||||
== f"arn:aws:cloudwatch:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:alarm:test"
|
||||
)
|
||||
assert cloudwatch.metric_alarms[0].name == "test"
|
||||
assert cloudwatch.metric_alarms[0].metric == "test_metric"
|
||||
assert cloudwatch.metric_alarms[0].name_space == "test_namespace"
|
||||
assert cloudwatch.metric_alarms[0].region == AWS_REGION
|
||||
assert cloudwatch.metric_alarms[0].tags == [
|
||||
{"Key": "key-1", "Value": "value-1"}
|
||||
]
|
||||
|
||||
# Test Logs Filters
|
||||
@mock_logs
|
||||
def test__describe_metric_filters__(self):
|
||||
# Logs client for this test class
|
||||
logs_client = client("logs", region_name=AWS_REGION)
|
||||
logs_client.put_metric_filter(
|
||||
logGroupName="/log-group/test",
|
||||
filterName="test-filter",
|
||||
filterPattern="test-pattern",
|
||||
metricTransformations=[
|
||||
{
|
||||
"metricName": "my-metric",
|
||||
"metricNamespace": "my-namespace",
|
||||
"metricValue": "$.value",
|
||||
}
|
||||
],
|
||||
)
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
logs = Logs(audit_info)
|
||||
assert len(logs.metric_filters) == 1
|
||||
assert logs.metric_filters[0].log_group == "/log-group/test"
|
||||
assert logs.metric_filters[0].name == "test-filter"
|
||||
assert logs.metric_filters[0].metric == "my-metric"
|
||||
assert logs.metric_filters[0].pattern == "test-pattern"
|
||||
assert logs.metric_filters[0].region == AWS_REGION
|
||||
|
||||
# Test Logs Filters
|
||||
@mock_logs
|
||||
def test__describe_log_groups__(self):
|
||||
# Logs client for this test class
|
||||
logs_client = client("logs", region_name=AWS_REGION)
|
||||
logs_client.create_log_group(
|
||||
logGroupName="/log-group/test",
|
||||
kmsKeyId="test_kms_id",
|
||||
tags={"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"},
|
||||
)
|
||||
logs_client.put_retention_policy(
|
||||
logGroupName="/log-group/test", retentionInDays=400
|
||||
)
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
logs = Logs(audit_info)
|
||||
assert len(logs.log_groups) == 1
|
||||
assert (
|
||||
logs.log_groups[0].arn
|
||||
== f"arn:aws:logs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:log-group:/log-group/test"
|
||||
)
|
||||
assert logs.log_groups[0].name == "/log-group/test"
|
||||
assert logs.log_groups[0].retention_days == 400
|
||||
assert logs.log_groups[0].kms_id == "test_kms_id"
|
||||
assert logs.log_groups[0].region == AWS_REGION
|
||||
assert logs.log_groups[0].tags == [
|
||||
{"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"}
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user