fix(log_group_retention): handle log groups that never expire (#2272)

This commit is contained in:
Pepe Fagoaga
2023-04-25 10:45:43 +02:00
committed by GitHub
parent 662e67ff16
commit 58cad1a6b3
4 changed files with 62 additions and 11 deletions

View File

@@ -13,11 +13,17 @@ class cloudwatch_log_group_retention_policy_specific_days_enabled(Check):
report.resource_id = log_group.name
report.resource_arn = log_group.arn
report.resource_tags = log_group.tags
if log_group.retention_days < specific_retention_days:
if (
log_group.never_expire is False
and log_group.retention_days < specific_retention_days
):
report.status = "FAIL"
report.status_extended = f"Log Group {log_group.name} has less than {specific_retention_days} days retention period ({log_group.retention_days} days)."
else:
report.status = "PASS"
report.status_extended = f"Log Group {log_group.name} comply with {specific_retention_days} days retention period since it has {log_group.retention_days} days."
if log_group.never_expire is True:
report.status_extended = f"Log Group {log_group.name} comply with {specific_retention_days} days retention period since it never expires."
else:
report.status_extended = f"Log Group {log_group.name} comply with {specific_retention_days} days retention period since it has {log_group.retention_days} days."
findings.append(report)
return findings

View File

@@ -152,17 +152,18 @@ class Logs:
if not self.audit_resources or (
is_resource_filtered(log_group["arn"], self.audit_resources)
):
kms = None
retention_days = 0
if "kmsKeyId" in log_group:
kms = log_group["kmsKeyId"]
if "retentionInDays" in log_group:
retention_days = log_group["retentionInDays"]
never_expire = False
kms = log_group.get("kmsKeyId")
retention_days = log_group.get("retentionInDays")
if not retention_days:
never_expire = True
retention_days = 9999
self.log_groups.append(
LogGroup(
arn=log_group["arn"],
name=log_group["logGroupName"],
retention_days=retention_days,
never_expire=never_expire,
kms_id=kms,
region=regional_client.region,
)
@@ -240,6 +241,7 @@ class LogGroup(BaseModel):
arn: str
name: str
retention_days: int
never_expire: bool
kms_id: Optional[str]
region: str
log_streams: dict[

View File

@@ -66,7 +66,7 @@ class Test_cloudwatch_log_group_retention_policy_specific_days_enabled:
assert len(result) == 0
@mock_logs
def test_cloudwatch_log_group_without_retention_days(self):
def test_cloudwatch_log_group_without_retention_days_never_expires(self):
# Generate Logs Client
logs_client = client("logs", region_name=AWS_REGION)
# Request Logs group
@@ -103,12 +103,17 @@ class Test_cloudwatch_log_group_retention_policy_specific_days_enabled:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Log Group test has less than 365 days retention period (0 days)."
== "Log Group test comply with 365 days retention period since it never expires."
)
assert result[0].resource_id == "test"
assert (
result[0].resource_arn
== f"arn:aws:logs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:log-group:test"
)
assert result[0].region == AWS_REGION
@mock_logs
def test_cloudwatch_log_group_with_compliant_retention_days(self):
@@ -155,6 +160,11 @@ class Test_cloudwatch_log_group_retention_policy_specific_days_enabled:
== "Log Group test comply with 365 days retention period since it has 400 days."
)
assert result[0].resource_id == "test"
assert (
result[0].resource_arn
== f"arn:aws:logs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:log-group:test"
)
assert result[0].region == AWS_REGION
@mock_logs
def test_cloudwatch_log_group_with_no_compliant_retention_days(self):
@@ -201,3 +211,8 @@ class Test_cloudwatch_log_group_retention_policy_specific_days_enabled:
== "Log Group test has less than 365 days retention period (7 days)."
)
assert result[0].resource_id == "test"
assert (
result[0].resource_arn
== f"arn:aws:logs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:log-group:test"
)
assert result[0].region == AWS_REGION

View File

@@ -195,6 +195,34 @@ class Test_CloudWatch_Service:
assert logs.log_groups[0].name == "/log-group/test"
assert logs.log_groups[0].retention_days == 400
assert logs.log_groups[0].kms_id == "test_kms_id"
assert not logs.log_groups[0].never_expire
assert logs.log_groups[0].region == AWS_REGION
assert logs.log_groups[0].tags == [
{"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"}
]
@mock_logs
def test__describe_log_groups__never_expire(self):
# Logs client for this test class
logs_client = client("logs", region_name=AWS_REGION)
logs_client.create_log_group(
logGroupName="/log-group/test",
kmsKeyId="test_kms_id",
tags={"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"},
)
audit_info = self.set_mocked_audit_info()
logs = Logs(audit_info)
assert len(logs.log_groups) == 1
assert (
logs.log_groups[0].arn
== f"arn:aws:logs:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:log-group:/log-group/test"
)
assert logs.log_groups[0].name == "/log-group/test"
assert logs.log_groups[0].never_expire
# Since it never expires we don't use the retention_days
assert logs.log_groups[0].retention_days == 9999
assert logs.log_groups[0].kms_id == "test_kms_id"
assert logs.log_groups[0].region == AWS_REGION
assert logs.log_groups[0].tags == [
{"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"}