fix(guardduty): handle disabled detectors in guardduty_is_enabled (#2616)

This commit is contained in:
Sergio Garcia
2023-07-25 12:26:37 +02:00
committed by GitHub
parent 50b8e084e7
commit 6328ef4444
5 changed files with 124 additions and 90 deletions

View File

@@ -6,22 +6,23 @@ class guardduty_centrally_managed(Check):
def execute(self):
findings = []
for detector in guardduty_client.detectors:
report = Check_Report_AWS(self.metadata())
report.region = detector.region
report.resource_id = detector.id
report.resource_arn = detector.arn
report.resource_tags = detector.tags
report.status = "FAIL"
report.status_extended = (
f"GuardDuty detector {detector.id} is not centrally managed"
)
if detector.administrator_account:
report.status = "PASS"
report.status_extended = f"GuardDuty detector {detector.id} is centrally managed by account {detector.administrator_account}"
elif detector.member_accounts:
report.status = "PASS"
report.status_extended = f"GuardDuty detector {detector.id} is administrator account with {len(detector.member_accounts)} member accounts"
if detector.id:
report = Check_Report_AWS(self.metadata())
report.region = detector.region
report.resource_id = detector.id
report.resource_arn = detector.arn
report.resource_tags = detector.tags
report.status = "FAIL"
report.status_extended = (
f"GuardDuty detector {detector.id} is not centrally managed"
)
if detector.administrator_account:
report.status = "PASS"
report.status_extended = f"GuardDuty detector {detector.id} is centrally managed by account {detector.administrator_account}"
elif detector.member_accounts:
report.status = "PASS"
report.status_extended = f"GuardDuty detector {detector.id} is administrator account with {len(detector.member_accounts)} member accounts"
findings.append(report)
findings.append(report)
return findings

View File

@@ -13,7 +13,10 @@ class guardduty_is_enabled(Check):
report.resource_tags = detector.tags
report.status = "PASS"
report.status_extended = f"GuardDuty detector {detector.id} enabled"
if detector.status is None:
if not detector.id:
report.status = "FAIL"
report.status_extended = "GuardDuty is not enabled"
elif detector.status is None:
report.status = "FAIL"
report.status_extended = (
f"GuardDuty detector {detector.id} not configured"

View File

@@ -6,17 +6,18 @@ class guardduty_no_high_severity_findings(Check):
def execute(self):
findings = []
for detector in guardduty_client.detectors:
report = Check_Report_AWS(self.metadata())
report.region = detector.region
report.resource_id = detector.id
report.resource_arn = detector.arn
report.resource_tags = detector.tags
report.status = "PASS"
report.status_extended = f"GuardDuty detector {detector.id} does not have high severity findings."
if len(detector.findings) > 0:
report.status = "FAIL"
report.status_extended = f"GuardDuty detector {detector.id} has {str(len(detector.findings))} high severity findings"
if detector.id:
report = Check_Report_AWS(self.metadata())
report.region = detector.region
report.resource_id = detector.id
report.resource_arn = detector.arn
report.resource_tags = detector.tags
report.status = "PASS"
report.status_extended = f"GuardDuty detector {detector.id} does not have high severity findings."
if len(detector.findings) > 0:
report.status = "FAIL"
report.status_extended = f"GuardDuty detector {detector.id} has {str(len(detector.findings))} high severity findings"
findings.append(report)
findings.append(report)
return findings

View File

@@ -40,9 +40,11 @@ class GuardDuty:
def __list_detectors__(self, regional_client):
logger.info("GuardDuty - listing detectors...")
try:
detectors = False
list_detectors_paginator = regional_client.get_paginator("list_detectors")
for page in list_detectors_paginator.paginate():
for detector in page["DetectorIds"]:
detectors = True
arn = f"arn:{self.audited_partition}:guardduty:{regional_client.region}:{self.audited_account}:detector/{detector}"
if not self.audit_resources or (
is_resource_filtered(arn, self.audit_resources)
@@ -52,6 +54,10 @@ class GuardDuty:
id=detector, arn=arn, region=regional_client.region
)
)
if not detectors:
self.detectors.append(
Detector(id="", arn="", region=regional_client.region)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -61,10 +67,14 @@ class GuardDuty:
logger.info("GuardDuty - getting detector info...")
try:
for detector in self.detectors:
regional_client = self.regional_clients[detector.region]
detector_info = regional_client.get_detector(DetectorId=detector.id)
if "Status" in detector_info and detector_info["Status"] == "ENABLED":
detector.status = True
if detector.id:
regional_client = self.regional_clients[detector.region]
detector_info = regional_client.get_detector(DetectorId=detector.id)
if (
"Status" in detector_info
and detector_info["Status"] == "ENABLED"
):
detector.status = True
except Exception as error:
logger.error(
@@ -75,23 +85,26 @@ class GuardDuty:
logger.info("GuardDuty - getting administrator account...")
try:
for detector in self.detectors:
try:
regional_client = self.regional_clients[detector.region]
detector_administrator = regional_client.get_administrator_account(
DetectorId=detector.id
)
detector_administrator_account = detector_administrator.get(
"Administrator"
)
if detector_administrator_account:
detector.administrator_account = (
detector_administrator_account.get("AccountId")
if detector.id:
try:
regional_client = self.regional_clients[detector.region]
detector_administrator = (
regional_client.get_administrator_account(
DetectorId=detector.id
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
detector_administrator_account = detector_administrator.get(
"Administrator"
)
if detector_administrator_account:
detector.administrator_account = (
detector_administrator_account.get("AccountId")
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except Exception as error:
logger.error(
@@ -102,21 +115,22 @@ class GuardDuty:
logger.info("GuardDuty - listing members...")
try:
for detector in self.detectors:
try:
regional_client = self.regional_clients[detector.region]
list_members_paginator = regional_client.get_paginator(
"list_members"
)
for page in list_members_paginator.paginate(
DetectorId=detector.id,
):
for member in page["Members"]:
detector.member_accounts.append(member.get("AccountId"))
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
if detector.id:
try:
regional_client = self.regional_clients[detector.region]
list_members_paginator = regional_client.get_paginator(
"list_members"
)
for page in list_members_paginator.paginate(
DetectorId=detector.id,
):
for member in page["Members"]:
detector.member_accounts.append(member.get("AccountId"))
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
@@ -126,27 +140,30 @@ class GuardDuty:
logger.info("GuardDuty - listing findings...")
try:
for detector in self.detectors:
regional_client = self.regional_clients[detector.region]
list_findings_paginator = regional_client.get_paginator("list_findings")
for page in list_findings_paginator.paginate(
DetectorId=detector.id,
FindingCriteria={
"Criterion": {
"severity": {
"Eq": [
"8",
],
},
"service.archived": {
"Eq": [
"false",
],
},
}
},
):
for finding in page["FindingIds"]:
detector.findings.append(finding)
if detector.id:
regional_client = self.regional_clients[detector.region]
list_findings_paginator = regional_client.get_paginator(
"list_findings"
)
for page in list_findings_paginator.paginate(
DetectorId=detector.id,
FindingCriteria={
"Criterion": {
"severity": {
"Eq": [
"8",
],
},
"service.archived": {
"Eq": [
"false",
],
},
}
},
):
for finding in page["FindingIds"]:
detector.findings.append(finding)
except Exception as error:
logger.error(
@@ -157,11 +174,12 @@ class GuardDuty:
logger.info("Guardduty - List Tags...")
try:
for detector in self.detectors:
regional_client = self.regional_clients[detector.region]
response = regional_client.list_tags_for_resource(
ResourceArn=detector.arn
)["Tags"]
detector.tags = [response]
if detector.arn:
regional_client = self.regional_clients[detector.region]
response = regional_client.list_tags_for_resource(
ResourceArn=detector.arn
)["Tags"]
detector.tags = [response]
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"

View File

@@ -17,6 +17,13 @@ class Test_guardduty_is_enabled:
def test_no_detectors(self):
guardduty_client = mock.MagicMock
guardduty_client.detectors = []
guardduty_client.detectors.append(
Detector(
id="",
region=AWS_REGION,
arn="",
)
)
with mock.patch(
"prowler.providers.aws.services.guardduty.guardduty_service.GuardDuty",
guardduty_client,
@@ -27,7 +34,11 @@ class Test_guardduty_is_enabled:
check = guardduty_is_enabled()
result = check.execute()
assert len(result) == 0
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("is not enabled", result[0].status_extended)
assert result[0].resource_id == ""
assert result[0].resource_arn == ""
def test_guardduty_enabled(self):
guardduty_client = mock.MagicMock