feat(check): New GuardDuty check guardduty_centrally_managed (#2195)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Gabriel Soltz
2023-04-14 14:30:51 +02:00
committed by GitHub
parent 29c9ad602d
commit 428fda81e2
9 changed files with 313 additions and 13 deletions

View File

@@ -0,0 +1,30 @@
{
"Provider": "aws",
"CheckID": "guardduty_centrally_managed",
"CheckTitle": "GuardDuty is centrally managed",
"CheckType": [],
"ServiceName": "guardduty",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:guardduty:region:account-id/detector-id",
"Severity": "medium",
"ResourceType": "AwsGuardDutyDetector",
"Description": "GuardDuty is centrally managed",
"Risk": "If GuardDuty is not centrally managed, it is not possible to centrally manage the GuardDuty findings, settings, and member accounts.",
"RelatedUrl": "https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_accounts.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure GuardDuty to be centrally managed",
"Url": "https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_accounts.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,27 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.guardduty.guardduty_client import guardduty_client
class guardduty_centrally_managed(Check):
def execute(self):
findings = []
for detector in guardduty_client.detectors:
report = Check_Report_AWS(self.metadata())
report.region = detector.region
report.resource_id = detector.id
report.resource_arn = detector.arn
report.resource_tags = detector.tags
report.status = "FAIL"
report.status_extended = (
f"GuardDuty detector {detector.id} is not centrally managed"
)
if detector.administrator_account:
report.status = "PASS"
report.status_extended = f"GuardDuty detector {detector.id} is centrally managed by account {detector.administrator_account}"
elif detector.member_accounts:
report.status = "PASS"
report.status_extended = f"GuardDuty detector {detector.id} is administrator account with {len(detector.member_accounts)} member accounts"
findings.append(report)
return findings

View File

@@ -5,7 +5,7 @@
"CheckType": [],
"ServiceName": "guardduty",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:sagemaker:region:account-id",
"ResourceIdTemplate": "arn:aws:guardduty:region:account-id/detector-id",
"Severity": "medium",
"ResourceType": "AwsGuardDutyDetector",
"Description": "Check if GuardDuty is enabled",

View File

@@ -5,7 +5,7 @@
"CheckType": [],
"ServiceName": "guardduty",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:sagemaker:region:account-id",
"ResourceIdTemplate": "arn:aws:guardduty:region:account-id/detector-id",
"Severity": "high",
"ResourceType": "AwsGuardDutyDetector",
"Description": "There are High severity GuardDuty findings ",

View File

@@ -19,8 +19,10 @@ class GuardDuty:
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.detectors = []
self.__threading_call__(self.__list_detectors__)
self.__get_detector__(self.regional_clients)
self.__list_findings__(self.regional_clients)
self.__get_detector__()
self.__list_findings__()
self.__list_members__()
self.__get_administrator_account__()
self.__list_tags_for_resource__()
def __get_session__(self):
@@ -55,25 +57,76 @@ class GuardDuty:
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_detector__(self, regional_clients):
def __get_detector__(self):
logger.info("GuardDuty - getting detector info...")
try:
for detector in self.detectors:
regional_client = regional_clients[detector.region]
regional_client = self.regional_clients[detector.region]
detector_info = regional_client.get_detector(DetectorId=detector.id)
if "Status" in detector_info and detector_info["Status"] == "ENABLED":
detector.status = True
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
def __list_findings__(self, regional_clients):
def __get_administrator_account__(self):
logger.info("GuardDuty - getting administrator account...")
try:
for detector in self.detectors:
try:
regional_client = self.regional_clients[detector.region]
detector_administrator = regional_client.get_administrator_account(
DetectorId=detector.id
)
detector_administrator_account = detector_administrator.get(
"Administrator"
)
if detector_administrator_account:
detector.administrator_account = (
detector_administrator_account.get("AccountId")
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
def __list_members__(self):
logger.info("GuardDuty - listing members...")
try:
for detector in self.detectors:
try:
regional_client = self.regional_clients[detector.region]
list_members_paginator = regional_client.get_paginator(
"list_members"
)
for page in list_members_paginator.paginate(
DetectorId=detector.id,
):
for member in page["Members"]:
detector.member_accounts.append(member.get("AccountId"))
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
def __list_findings__(self):
logger.info("GuardDuty - listing findings...")
try:
for detector in self.detectors:
regional_client = regional_clients[detector.region]
regional_client = self.regional_clients[detector.region]
list_findings_paginator = regional_client.get_paginator("list_findings")
for page in list_findings_paginator.paginate(
DetectorId=detector.id,
@@ -97,7 +150,7 @@ class GuardDuty:
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
def __list_tags_for_resource__(self):
@@ -111,7 +164,7 @@ class GuardDuty:
detector.tags = [response]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
@@ -121,4 +174,6 @@ class Detector(BaseModel):
region: str
status: bool = None
findings: list = []
member_accounts: list = []
administrator_account: str = None
tags: Optional[list] = []

View File

@@ -2,11 +2,11 @@ from unittest.mock import patch
import botocore
from boto3 import session
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_service import (
AccessAnalyzer,
)
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
# Mock Test Region
AWS_REGION = "eu-west-1"

View File

@@ -0,0 +1,133 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.aws.services.guardduty.guardduty_service import Detector
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_NUMBER_ADMIN = "123456789013"
DETECTOR_ID = str(uuid4())
DETECTOR_ARN = (
f"arn:aws:guardduty:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:detector/{DETECTOR_ID}"
)
class Test_guardduty_centrally_managed:
def test_no_detectors(self):
guardduty_client = mock.MagicMock
guardduty_client.detectors = []
with mock.patch(
"prowler.providers.aws.services.guardduty.guardduty_service.GuardDuty",
guardduty_client,
):
from prowler.providers.aws.services.guardduty.guardduty_no_high_severity_findings.guardduty_no_high_severity_findings import (
guardduty_no_high_severity_findings,
)
check = guardduty_no_high_severity_findings()
result = check.execute()
assert len(result) == 0
def test_detector_no_centralized_managed(self):
guardduty_client = mock.MagicMock
guardduty_client.detectors = []
guardduty_client.detectors.append(
Detector(
id=DETECTOR_ID,
region=AWS_REGION,
arn=DETECTOR_ARN,
status=False,
findings=[str(uuid4())],
)
)
with mock.patch(
"prowler.providers.aws.services.guardduty.guardduty_service.GuardDuty",
guardduty_client,
):
# Test Check
from prowler.providers.aws.services.guardduty.guardduty_centrally_managed.guardduty_centrally_managed import (
guardduty_centrally_managed,
)
check = guardduty_centrally_managed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"GuardDuty detector {DETECTOR_ID} is not centrally managed"
)
assert result[0].resource_id == DETECTOR_ID
assert result[0].region == AWS_REGION
assert result[0].resource_arn == DETECTOR_ARN
def test_detector_centralized_managed(self):
guardduty_client = mock.MagicMock
guardduty_client.detectors = []
guardduty_client.detectors.append(
Detector(
id=DETECTOR_ID,
region=AWS_REGION,
arn=DETECTOR_ARN,
status=False,
findings=[str(uuid4())],
administrator_account=AWS_ACCOUNT_NUMBER_ADMIN,
)
)
with mock.patch(
"prowler.providers.aws.services.guardduty.guardduty_service.GuardDuty",
guardduty_client,
):
# Test Check
from prowler.providers.aws.services.guardduty.guardduty_centrally_managed.guardduty_centrally_managed import (
guardduty_centrally_managed,
)
check = guardduty_centrally_managed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"GuardDuty detector {DETECTOR_ID} is centrally managed by account {AWS_ACCOUNT_NUMBER_ADMIN}"
)
assert result[0].resource_id == DETECTOR_ID
assert result[0].region == AWS_REGION
assert result[0].resource_arn == DETECTOR_ARN
def test_detector_administrator(self):
guardduty_client = mock.MagicMock
guardduty_client.detectors = []
guardduty_client.detectors.append(
Detector(
id=DETECTOR_ID,
region=AWS_REGION,
arn=DETECTOR_ARN,
status=False,
findings=[str(uuid4())],
member_accounts=[AWS_ACCOUNT_NUMBER_ADMIN],
)
)
with mock.patch(
"prowler.providers.aws.services.guardduty.guardduty_service.GuardDuty",
guardduty_client,
):
# Test Check
from prowler.providers.aws.services.guardduty.guardduty_centrally_managed.guardduty_centrally_managed import (
guardduty_centrally_managed,
)
check = guardduty_centrally_managed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"GuardDuty detector {DETECTOR_ID} is administrator account with 1 member accounts"
)
assert result[0].resource_id == DETECTOR_ID
assert result[0].region == AWS_REGION
assert result[0].resource_arn == DETECTOR_ARN

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from unittest.mock import patch
import botocore
@@ -7,6 +8,7 @@ from moto import mock_guardduty
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
AWS_ACCOUNT_NUMBER_ADMIN = "123456789013"
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "eu-west-1"
@@ -18,6 +20,30 @@ def mock_make_api_call(self, operation_name, kwarg):
return {"FindingIds": ["86c1d16c9ec63f634ccd087ae0d427ba1"]}
if operation_name == "ListTagsForResource":
return {"Tags": {"test": "test"}}
if operation_name == "ListMembers":
return {
"Members": [
{
"AccountId": AWS_ACCOUNT_NUMBER,
"DetectorId": "11b4a9318fd146914420a637a4a9248b",
"MasterId": AWS_ACCOUNT_NUMBER_ADMIN,
"Email": "security@prowler.com",
"RelationshipStatus": "Enabled",
"InvitedAt": datetime(2020, 1, 1),
"UpdatedAt": datetime(2021, 1, 1),
"AdministratorId": AWS_ACCOUNT_NUMBER_ADMIN,
},
],
}
if operation_name == "GetAdministratorAccount":
return {
"Administrator": {
"AccountId": AWS_ACCOUNT_NUMBER_ADMIN,
"InvitationId": "12b1a931a981d1e1f1f452cf2fb3d515",
"RelationshipStatus": "Enabled",
"InvitedAt": datetime(2020, 1, 1),
}
}
return make_api_call(self, operation_name, kwarg)
@@ -117,3 +143,32 @@ class Test_GuardDuty_Service:
assert guardduty.detectors[0].region == AWS_REGION
assert guardduty.detectors[0].status
assert len(guardduty.detectors[0].findings) == 1
@mock_guardduty
def test__list_members__(self):
guardduty_client = client("guardduty", region_name=AWS_REGION)
response = guardduty_client.create_detector(Enable=True)
audit_info = self.set_mocked_audit_info()
guardduty = GuardDuty(audit_info)
assert len(guardduty.detectors) == 1
assert guardduty.detectors[0].id == response["DetectorId"]
assert guardduty.detectors[0].region == AWS_REGION
assert guardduty.detectors[0].status
assert len(guardduty.detectors[0].member_accounts) == 1
@mock_guardduty
# Test GuardDuty session
def test__get_administrator_account__(self):
guardduty_client = client("guardduty", region_name=AWS_REGION)
response = guardduty_client.create_detector(Enable=True)
audit_info = self.set_mocked_audit_info()
guardduty = GuardDuty(audit_info)
assert len(guardduty.detectors) == 1
assert guardduty.detectors[0].id == response["DetectorId"]
assert guardduty.detectors[0].region == AWS_REGION
assert guardduty.detectors[0].status
assert guardduty.detectors[0].administrator_account == AWS_ACCOUNT_NUMBER_ADMIN