mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 06:45:08 +00:00
feat(): guardduty checks and service (#1492)
This commit is contained in:
0
providers/aws/services/guardduty/__init__.py
Normal file
0
providers/aws/services/guardduty/__init__.py
Normal file
@@ -1,50 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
# use this file except in compliance with the License. You may obtain a copy
|
||||
# of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software distributed
|
||||
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations under the License.
|
||||
CHECK_ID_extra713="7.13"
|
||||
CHECK_TITLE_extra713="[extra713] Check if GuardDuty is enabled"
|
||||
CHECK_SCORED_extra713="NOT_SCORED"
|
||||
CHECK_CIS_LEVEL_extra713="EXTRA"
|
||||
CHECK_SEVERITY_extra713="High"
|
||||
CHECK_ALTERNATE_check713="extra713"
|
||||
CHECK_ASFF_COMPLIANCE_TYPE_extra713="ens-op.mon.1.aws.duty.1"
|
||||
CHECK_ASFF_RESOURCE_TYPE_extra713="AwsGuardDutyDetector"
|
||||
CHECK_SERVICENAME_extra713="guardduty"
|
||||
CHECK_RISK_extra713='Amazon GuardDuty is a continuous security monitoring service that analyzes and processes several datasources.'
|
||||
CHECK_REMEDIATION_extra713='Enable GuardDuty and analyze its findings.'
|
||||
CHECK_DOC_extra713='https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_settingup.html'
|
||||
CHECK_CAF_EPIC_extra713='Data Protection'
|
||||
|
||||
extra713(){
|
||||
# "Check if GuardDuty is enabled "
|
||||
for regx in $REGIONS; do
|
||||
LIST_OF_GUARDDUTY_DETECTORS=$($AWSCLI guardduty list-detectors $PROFILE_OPT --region $regx --output text --query DetectorIds[*] 2> /dev/null)
|
||||
RESULT=$?
|
||||
if [ $RESULT -eq 0 ];then
|
||||
if [[ $LIST_OF_GUARDDUTY_DETECTORS ]];then
|
||||
while read -r detector;do
|
||||
DETECTOR_ENABLED=$($AWSCLI guardduty get-detector --detector-id $detector $PROFILE_OPT --region $regx --query "Status" --output text|grep ENABLED)
|
||||
if [[ $DETECTOR_ENABLED ]]; then
|
||||
textPass "$regx: GuardDuty detector $detector enabled" "$regx" "$detector"
|
||||
else
|
||||
textFail "$regx: GuardDuty detector $detector configured but suspended" "$regx" "$detector"
|
||||
fi
|
||||
done <<< "$LIST_OF_GUARDDUTY_DETECTORS"
|
||||
else
|
||||
textFail "$regx: GuardDuty detector not configured!" "$regx"
|
||||
fi
|
||||
else
|
||||
# if list-detectors return any error
|
||||
textInfo "$regx: GuardDuty not checked or Access Denied trying to get detector" "$regx"
|
||||
fi
|
||||
done
|
||||
}
|
||||
@@ -1,47 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
# use this file except in compliance with the License. You may obtain a copy
|
||||
# of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software distributed
|
||||
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations under the License.
|
||||
CHECK_ID_extra7139="7.139"
|
||||
CHECK_TITLE_extra7139="[extra7139] There are High severity GuardDuty findings "
|
||||
CHECK_SCORED_extra7139="NOT_SCORED"
|
||||
CHECK_CIS_LEVEL_extra7139="EXTRA"
|
||||
CHECK_SEVERITY_extra7139="High"
|
||||
CHECK_ASFF_RESOURCE_TYPE_extra7139="AwsGuardDutyDetector"
|
||||
CHECK_ALTERNATE_check7139="extra7139"
|
||||
CHECK_SERVICENAME_extra7139="guardduty"
|
||||
CHECK_RISK_extra7139='If critical findings are not addressed threats can spread in the environment.'
|
||||
CHECK_REMEDIATION_extra7139='Review and remediate critical GuardDuty findings as quickly as possible.'
|
||||
CHECK_DOC_extra7139='https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_findings.html'
|
||||
CHECK_CAF_EPIC_extra7139='Incident Response'
|
||||
extra7139(){
|
||||
|
||||
for regx in $REGIONS; do
|
||||
DETECTORS_LIST=""
|
||||
DETECTORS_LIST=$($AWSCLI guardduty list-detectors --query DetectorIds $PROFILE_OPT --region $regx --output text 2>&1)
|
||||
if [[ $(echo "$DETECTORS_LIST" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
|
||||
textInfo "$regx: Access Denied trying to list detectors" "$regx"
|
||||
continue
|
||||
fi
|
||||
if [[ $DETECTORS_LIST ]];then
|
||||
for DETECTOR in $DETECTORS_LIST;do
|
||||
FINDINGS_COUNT=""
|
||||
FINDINGS_COUNT=$($AWSCLI $PROFILE_OPT --region $regx --output text guardduty list-findings --detector-id $DETECTOR --finding-criteria '{"Criterion":{"severity": {"Eq":["8"]}, "service.archived": {"Eq": ["false"]}}}' 2> /dev/null | wc -l | xargs) # Severity LOW=2, MED=4, HIGH=8
|
||||
if [[ $FINDINGS_COUNT -gt 0 ]];then
|
||||
textFail "$regx: GuardDuty has $FINDINGS_COUNT high severity findings." "$regx"
|
||||
else
|
||||
textPass "$regx: GuardDuty has no high severity findings." "$regx"
|
||||
fi
|
||||
done
|
||||
else
|
||||
textInfo "$regx: No GuardDuty detectors found." "$regx"
|
||||
fi
|
||||
done
|
||||
}
|
||||
4
providers/aws/services/guardduty/guardduty_client.py
Normal file
4
providers/aws/services/guardduty/guardduty_client.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from providers.aws.services.guardduty.guardduty_service import GuardDuty
|
||||
|
||||
guardduty_client = GuardDuty(current_audit_info)
|
||||
@@ -0,0 +1,35 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "guardduty_is_enabled",
|
||||
"CheckTitle": "Check if GuardDuty is enabled",
|
||||
"CheckType": [],
|
||||
"ServiceName": "guardduty",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:aws:sagemaker:region:account-id",
|
||||
"Severity": "high",
|
||||
"ResourceType": "AwsGuardDutyDetector",
|
||||
"Description": "Check if GuardDuty is enabled",
|
||||
"Risk": "Amazon GuardDuty is a continuous security monitoring service that analyzes and processes several datasources.",
|
||||
"RelatedUrl": "https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_settingup.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/GuardDuty/guardduty-enabled.html",
|
||||
"NativeIaC": "",
|
||||
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/GuardDuty/guardduty-enabled.html",
|
||||
"Terraform": "https://docs.bridgecrew.io/docs/ensure-guardduty-is-enabled-to-specific-orgregion#fix---buildtime"
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Enable GuardDuty and analyze its findings.",
|
||||
"Url": "https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_settingup.html"
|
||||
}
|
||||
},
|
||||
"Categories": [],
|
||||
"Tags": {
|
||||
"Tag1Key": "value",
|
||||
"Tag2Key": "value"
|
||||
},
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "",
|
||||
"Compliance": []
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
from lib.check.models import Check, Check_Report
|
||||
from providers.aws.services.guardduty.guardduty_client import guardduty_client
|
||||
|
||||
|
||||
class guardduty_is_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for detector in guardduty_client.detectors:
|
||||
report = Check_Report(self.metadata)
|
||||
report.region = detector.region
|
||||
report.resource_id = detector.id
|
||||
report.resource_arn = detector.arn
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"GuardDuty detector {detector.id} enabled"
|
||||
if detector.status is None:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"GuardDuty detector {detector.id} not configured"
|
||||
)
|
||||
elif not detector.status:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"GuardDuty detector {detector.id} configured but suspended"
|
||||
)
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,104 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
from uuid import uuid4
|
||||
|
||||
from providers.aws.services.guardduty.guardduty_service import Detector
|
||||
|
||||
AWS_REGION = "eu-west-1"
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
|
||||
detector_id = str(uuid4())
|
||||
|
||||
|
||||
class Test_guardduty_is_enabled:
|
||||
def test_no_detectors(self):
|
||||
guardduty_client = mock.MagicMock
|
||||
guardduty_client.detectors = []
|
||||
with mock.patch(
|
||||
"providers.aws.services.guardduty.guardduty_service.GuardDuty",
|
||||
guardduty_client,
|
||||
):
|
||||
from providers.aws.services.guardduty.guardduty_is_enabled.guardduty_is_enabled import (
|
||||
guardduty_is_enabled,
|
||||
)
|
||||
|
||||
check = guardduty_is_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
def test_guardduty_enabled(self):
|
||||
guardduty_client = mock.MagicMock
|
||||
guardduty_client.detectors = []
|
||||
guardduty_client.detectors.append(
|
||||
Detector(
|
||||
id=detector_id,
|
||||
region=AWS_REGION,
|
||||
status=True,
|
||||
)
|
||||
)
|
||||
with mock.patch(
|
||||
"providers.aws.services.guardduty.guardduty_service.GuardDuty",
|
||||
guardduty_client,
|
||||
):
|
||||
from providers.aws.services.guardduty.guardduty_is_enabled.guardduty_is_enabled import (
|
||||
guardduty_is_enabled,
|
||||
)
|
||||
|
||||
check = guardduty_is_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search("enabled", result[0].status_extended)
|
||||
assert result[0].resource_id == detector_id
|
||||
assert result[0].resource_arn == ""
|
||||
|
||||
def test_guardduty_configured_but_suspended(self):
|
||||
guardduty_client = mock.MagicMock
|
||||
guardduty_client.detectors = []
|
||||
guardduty_client.detectors.append(
|
||||
Detector(
|
||||
id=detector_id,
|
||||
region=AWS_REGION,
|
||||
status=False,
|
||||
)
|
||||
)
|
||||
with mock.patch(
|
||||
"providers.aws.services.guardduty.guardduty_service.GuardDuty",
|
||||
guardduty_client,
|
||||
):
|
||||
from providers.aws.services.guardduty.guardduty_is_enabled.guardduty_is_enabled import (
|
||||
guardduty_is_enabled,
|
||||
)
|
||||
|
||||
check = guardduty_is_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search("configured but suspended", result[0].status_extended)
|
||||
assert result[0].resource_id == detector_id
|
||||
assert result[0].resource_arn == ""
|
||||
|
||||
def test_guardduty_not_configured(self):
|
||||
guardduty_client = mock.MagicMock
|
||||
guardduty_client.detectors = []
|
||||
guardduty_client.detectors.append(
|
||||
Detector(
|
||||
id=detector_id,
|
||||
region=AWS_REGION,
|
||||
)
|
||||
)
|
||||
with mock.patch(
|
||||
"providers.aws.services.guardduty.guardduty_service.GuardDuty",
|
||||
guardduty_client,
|
||||
):
|
||||
from providers.aws.services.guardduty.guardduty_is_enabled.guardduty_is_enabled import (
|
||||
guardduty_is_enabled,
|
||||
)
|
||||
|
||||
check = guardduty_is_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search("not configured", result[0].status_extended)
|
||||
assert result[0].resource_id == detector_id
|
||||
assert result[0].resource_arn == ""
|
||||
@@ -0,0 +1,35 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "guardduty_no_high_severity_findings",
|
||||
"CheckTitle": "There are High severity GuardDuty findings ",
|
||||
"CheckType": [],
|
||||
"ServiceName": "guardduty",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:aws:sagemaker:region:account-id",
|
||||
"Severity": "high",
|
||||
"ResourceType": "AwsGuardDutyDetector",
|
||||
"Description": "There are High severity GuardDuty findings ",
|
||||
"Risk": "If critical findings are not addressed threats can spread in the environment.",
|
||||
"RelatedUrl": "https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_findings.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/GuardDuty/findings.html",
|
||||
"NativeIaC": "",
|
||||
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/GuardDuty/findings.html",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Review and remediate critical GuardDuty findings as quickly as possible.",
|
||||
"Url": "https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_findings.html"
|
||||
}
|
||||
},
|
||||
"Categories": [],
|
||||
"Tags": {
|
||||
"Tag1Key": "value",
|
||||
"Tag2Key": "value"
|
||||
},
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "",
|
||||
"Compliance": []
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
from lib.check.models import Check, Check_Report
|
||||
from providers.aws.services.guardduty.guardduty_client import guardduty_client
|
||||
|
||||
|
||||
class guardduty_no_high_severity_findings(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for detector in guardduty_client.detectors:
|
||||
report = Check_Report(self.metadata)
|
||||
report.region = detector.region
|
||||
report.resource_id = detector.id
|
||||
report.resource_arn = detector.arn
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"GuardDuty detector {detector.id} does not have high severity findings."
|
||||
if len(detector.findings) > 0:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"GuardDuty detector {detector.id} has {str(len(detector.findings))} high severity findings"
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,78 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
from uuid import uuid4
|
||||
|
||||
from providers.aws.services.guardduty.guardduty_service import Detector
|
||||
|
||||
AWS_REGION = "eu-west-1"
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
|
||||
detector_id = str(uuid4())
|
||||
|
||||
|
||||
class Test_guardduty_no_high_severity_findings:
|
||||
def test_no_detectors(self):
|
||||
guardduty_client = mock.MagicMock
|
||||
guardduty_client.detectors = []
|
||||
with mock.patch(
|
||||
"providers.aws.services.guardduty.guardduty_service.GuardDuty",
|
||||
guardduty_client,
|
||||
):
|
||||
from providers.aws.services.guardduty.guardduty_no_high_severity_findings.guardduty_no_high_severity_findings import (
|
||||
guardduty_no_high_severity_findings,
|
||||
)
|
||||
|
||||
check = guardduty_no_high_severity_findings()
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
def test_no_high_findings(self):
|
||||
guardduty_client = mock.MagicMock
|
||||
guardduty_client.detectors = []
|
||||
guardduty_client.detectors.append(
|
||||
Detector(
|
||||
id=detector_id,
|
||||
region=AWS_REGION,
|
||||
)
|
||||
)
|
||||
with mock.patch(
|
||||
"providers.aws.services.guardduty.guardduty_service.GuardDuty",
|
||||
guardduty_client,
|
||||
):
|
||||
from providers.aws.services.guardduty.guardduty_no_high_severity_findings.guardduty_no_high_severity_findings import (
|
||||
guardduty_no_high_severity_findings,
|
||||
)
|
||||
|
||||
check = guardduty_no_high_severity_findings()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"does not have high severity findings.", result[0].status_extended
|
||||
)
|
||||
assert result[0].resource_id == detector_id
|
||||
assert result[0].resource_arn == ""
|
||||
|
||||
def test_high_findings(self):
|
||||
guardduty_client = mock.MagicMock
|
||||
guardduty_client.detectors = []
|
||||
guardduty_client.detectors.append(
|
||||
Detector(
|
||||
id=detector_id, region=AWS_REGION, status=False, findings=[str(uuid4())]
|
||||
)
|
||||
)
|
||||
with mock.patch(
|
||||
"providers.aws.services.guardduty.guardduty_service.GuardDuty",
|
||||
guardduty_client,
|
||||
):
|
||||
from providers.aws.services.guardduty.guardduty_no_high_severity_findings.guardduty_no_high_severity_findings import (
|
||||
guardduty_no_high_severity_findings,
|
||||
)
|
||||
|
||||
check = guardduty_no_high_severity_findings()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search("has 1 high severity findings", result[0].status_extended)
|
||||
assert result[0].resource_id == detector_id
|
||||
assert result[0].resource_arn == ""
|
||||
98
providers/aws/services/guardduty/guardduty_service.py
Normal file
98
providers/aws/services/guardduty/guardduty_service.py
Normal file
@@ -0,0 +1,98 @@
|
||||
import threading
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from lib.logger import logger
|
||||
from providers.aws.aws_provider import generate_regional_clients
|
||||
|
||||
|
||||
################################ GuardDuty
|
||||
class GuardDuty:
|
||||
def __init__(self, audit_info):
|
||||
self.service = "guardduty"
|
||||
self.session = audit_info.audit_session
|
||||
self.regional_clients = generate_regional_clients(self.service, audit_info)
|
||||
self.detectors = []
|
||||
self.__threading_call__(self.__list_detectors__)
|
||||
self.__get_detector__(self.regional_clients)
|
||||
self.__list_findings__(self.regional_clients)
|
||||
|
||||
def __get_session__(self):
|
||||
return self.session
|
||||
|
||||
def __threading_call__(self, call):
|
||||
threads = []
|
||||
for regional_client in self.regional_clients.values():
|
||||
threads.append(threading.Thread(target=call, args=(regional_client,)))
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
def __list_detectors__(self, regional_client):
|
||||
logger.info("GuardDuty - listing detectors...")
|
||||
try:
|
||||
list_detectors_paginator = regional_client.get_paginator("list_detectors")
|
||||
for page in list_detectors_paginator.paginate():
|
||||
for detector in page["DetectorIds"]:
|
||||
self.detectors.append(
|
||||
Detector(id=detector, region=regional_client.region)
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def __get_detector__(self, regional_clients):
|
||||
logger.info("GuardDuty - getting detector info...")
|
||||
try:
|
||||
for detector in self.detectors:
|
||||
regional_client = regional_clients[detector.region]
|
||||
detector_info = regional_client.get_detector(DetectorId=detector.id)
|
||||
if "Status" in detector_info and detector_info["Status"] == "ENABLED":
|
||||
detector.status = True
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def __list_findings__(self, regional_clients):
|
||||
logger.info("GuardDuty - listing findings...")
|
||||
try:
|
||||
for detector in self.detectors:
|
||||
regional_client = regional_clients[detector.region]
|
||||
list_findings_paginator = regional_client.get_paginator("list_findings")
|
||||
for page in list_findings_paginator.paginate(
|
||||
DetectorId=detector.id,
|
||||
FindingCriteria={
|
||||
"Criterion": {
|
||||
"severity": {
|
||||
"Eq": [
|
||||
"8",
|
||||
],
|
||||
},
|
||||
"service.archived": {
|
||||
"Eq": [
|
||||
"false",
|
||||
],
|
||||
},
|
||||
}
|
||||
},
|
||||
):
|
||||
for finding in page["FindingIds"]:
|
||||
detector.findings.append(finding)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
|
||||
class Detector(BaseModel):
|
||||
id: str
|
||||
# there is no arn for a guardduty detector but we want it filled for the reports
|
||||
arn: str = ""
|
||||
region: str
|
||||
status: bool = None
|
||||
findings: list = []
|
||||
114
providers/aws/services/guardduty/guardduty_service_test.py
Normal file
114
providers/aws/services/guardduty/guardduty_service_test.py
Normal file
@@ -0,0 +1,114 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
import botocore
|
||||
from boto3 import client, session
|
||||
from moto import mock_guardduty
|
||||
|
||||
from providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
from providers.aws.services.guardduty.guardduty_service import GuardDuty
|
||||
|
||||
AWS_ACCOUNT_NUMBER = 123456789012
|
||||
AWS_REGION = "eu-west-1"
|
||||
|
||||
make_api_call = botocore.client.BaseClient._make_api_call
|
||||
|
||||
|
||||
def mock_make_api_call(self, operation_name, kwarg):
|
||||
if operation_name == "ListFindings":
|
||||
return {"FindingIds": ["86c1d16c9ec63f634ccd087ae0d427ba1"]}
|
||||
return make_api_call(self, operation_name, kwarg)
|
||||
|
||||
|
||||
def mock_generate_regional_clients(service, audit_info):
|
||||
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
|
||||
regional_client.region = AWS_REGION
|
||||
return {AWS_REGION: regional_client}
|
||||
|
||||
|
||||
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
@patch(
|
||||
"providers.aws.services.guardduty.guardduty_service.generate_regional_clients",
|
||||
new=mock_generate_regional_clients,
|
||||
)
|
||||
class Test_GuardDuty_Service:
|
||||
# Mocked Audit Info
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=None,
|
||||
organizations_metadata=None,
|
||||
)
|
||||
return audit_info
|
||||
|
||||
# Test GuardDuty Service
|
||||
def test_service(self):
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
guardduty = GuardDuty(audit_info)
|
||||
assert guardduty.service == "guardduty"
|
||||
|
||||
# Test GuardDuty client
|
||||
def test_client(self):
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
guardduty = GuardDuty(audit_info)
|
||||
for reg_client in guardduty.regional_clients.values():
|
||||
assert reg_client.__class__.__name__ == "GuardDuty"
|
||||
|
||||
# Test GuardDuty session
|
||||
def test__get_session__(self):
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
guardduty = GuardDuty(audit_info)
|
||||
assert guardduty.session.__class__.__name__ == "Session"
|
||||
|
||||
@mock_guardduty
|
||||
# Test GuardDuty session
|
||||
def test__list_detectors__(self):
|
||||
guardduty_client = client("guardduty", region_name=AWS_REGION)
|
||||
response = guardduty_client.create_detector(Enable=True)
|
||||
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
guardduty = GuardDuty(audit_info)
|
||||
|
||||
assert len(guardduty.detectors) == 1
|
||||
assert guardduty.detectors[0].id == response["DetectorId"]
|
||||
assert guardduty.detectors[0].region == AWS_REGION
|
||||
|
||||
@mock_guardduty
|
||||
# Test GuardDuty session
|
||||
def test__get_detector__(self):
|
||||
guardduty_client = client("guardduty", region_name=AWS_REGION)
|
||||
response = guardduty_client.create_detector(Enable=True)
|
||||
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
guardduty = GuardDuty(audit_info)
|
||||
|
||||
assert len(guardduty.detectors) == 1
|
||||
assert guardduty.detectors[0].id == response["DetectorId"]
|
||||
assert guardduty.detectors[0].region == AWS_REGION
|
||||
assert guardduty.detectors[0].status
|
||||
|
||||
@mock_guardduty
|
||||
# Test GuardDuty session
|
||||
def test__list_findings__(self):
|
||||
guardduty_client = client("guardduty", region_name=AWS_REGION)
|
||||
response = guardduty_client.create_detector(Enable=True)
|
||||
|
||||
audit_info = self.set_mocked_audit_info()
|
||||
guardduty = GuardDuty(audit_info)
|
||||
|
||||
assert len(guardduty.detectors) == 1
|
||||
assert guardduty.detectors[0].id == response["DetectorId"]
|
||||
assert guardduty.detectors[0].region == AWS_REGION
|
||||
assert guardduty.detectors[0].status
|
||||
assert len(guardduty.detectors[0].findings) == 1
|
||||
Reference in New Issue
Block a user