Files
prowler/tests/providers/aws/lib/security_hub/security_hub_test.py
2024-02-28 10:03:43 +01:00

401 lines
15 KiB
Python

from logging import ERROR, WARNING
from os import path
import botocore
from boto3 import session
from botocore.client import ClientError
from mock import MagicMock, patch
from prowler.config.config import prowler_version, timestamp_utc
from prowler.lib.check.models import Check_Report, load_check_metadata
# from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.lib.security_hub.security_hub import (
batch_send_to_security_hub,
prepare_security_hub_findings,
verify_security_hub_integration_enabled_per_region,
)
from tests.providers.aws.audit_info_utils import (
AWS_ACCOUNT_NUMBER,
AWS_COMMERCIAL_PARTITION,
AWS_REGION_EU_WEST_1,
AWS_REGION_EU_WEST_2,
set_mocked_aws_audit_info,
)
def get_security_hub_finding(status: str):
return {
"SchemaVersion": "2018-10-08",
"Id": f"prowler-iam_user_accesskey_unused-{AWS_ACCOUNT_NUMBER}-{AWS_REGION_EU_WEST_1}-ee26b0dd4",
"ProductArn": f"arn:aws:securityhub:{AWS_REGION_EU_WEST_1}::product/prowler/prowler",
"RecordState": "ACTIVE",
"ProductFields": {
"ProviderName": "Prowler",
"ProviderVersion": prowler_version,
"ProwlerResourceName": "test",
},
"GeneratorId": "prowler-iam_user_accesskey_unused",
"AwsAccountId": f"{AWS_ACCOUNT_NUMBER}",
"Types": ["Software and Configuration Checks"],
"FirstObservedAt": timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
"UpdatedAt": timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
"CreatedAt": timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
"Severity": {"Label": "LOW"},
"Title": "Ensure Access Keys unused are disabled",
"Description": "test",
"Resources": [
{
"Type": "AwsIamAccessAnalyzer",
"Id": "test",
"Partition": "aws",
"Region": f"{AWS_REGION_EU_WEST_1}",
}
],
"Compliance": {
"Status": status,
"RelatedRequirements": [],
"AssociatedStandards": [],
},
"Remediation": {
"Recommendation": {
"Text": "Run sudo yum update and cross your fingers and toes.",
"Url": "https://myfp.com/recommendations/dangerous_things_and_how_to_fix_them.html",
}
},
}
# Mocking Security Hub Get Findings
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "BatchImportFindings":
return {
"FailedCount": 0,
"SuccessCount": 1,
}
if operation_name == "DescribeHub":
return {
"HubArn": f"arn:aws:securityhub:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:hub/default",
"SubscribedAt": "2023-02-07T09:45:43.742Z",
"AutoEnableControls": True,
"ControlFindingGenerator": "STANDARD_CONTROL",
}
if operation_name == "ListEnabledProductsForImport":
return {
"ProductSubscriptions": [
f"arn:aws:securityhub:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:product-subscription/prowler/prowler",
]
}
return make_api_call(self, operation_name, kwarg)
class Test_SecurityHub:
def generate_finding(self, status, region):
finding = Check_Report(
load_check_metadata(
f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json"
).json()
)
finding.status = status
finding.status_extended = "test"
finding.resource_id = "test"
finding.resource_arn = "test"
finding.region = region
return finding
def set_mocked_output_options(
self, is_quiet: bool = False, send_sh_only_fails: bool = False
):
output_options = MagicMock
output_options.bulk_checks_metadata = {}
output_options.is_quiet = is_quiet
output_options.send_sh_only_fails = send_sh_only_fails
return output_options
def set_mocked_session(self, region):
# Create mock session
return session.Session(
region_name=region,
)
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_verify_security_hub_integration_enabled_per_region(self):
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
assert verify_security_hub_integration_enabled_per_region(
AWS_COMMERCIAL_PARTITION, AWS_REGION_EU_WEST_1, session, AWS_ACCOUNT_NUMBER
)
def test_verify_security_hub_integration_enabled_per_region_security_hub_disabled(
self, caplog
):
caplog.set_level(WARNING)
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
with patch(
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
) as mock_security_hub:
error_message = f"Account {AWS_ACCOUNT_NUMBER} is not subscribed to AWS Security Hub in region {AWS_REGION_EU_WEST_1}"
error_code = "InvalidAccessException"
error_response = {
"Error": {
"Code": error_code,
"Message": error_message,
}
}
operation_name = "DescribeHub"
mock_security_hub.side_effect = ClientError(error_response, operation_name)
assert not verify_security_hub_integration_enabled_per_region(
AWS_COMMERCIAL_PARTITION,
AWS_REGION_EU_WEST_1,
session,
AWS_ACCOUNT_NUMBER,
)
assert caplog.record_tuples == [
(
"root",
WARNING,
f"ClientError -- [68]: An error occurred ({error_code}) when calling the {operation_name} operation: {error_message}",
)
]
def test_verify_security_hub_integration_enabled_per_region_prowler_not_subscribed(
self, caplog
):
caplog.set_level(WARNING)
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
with patch(
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
) as mock_security_hub:
mock_security_hub.describe_hub.return_value = None
mock_security_hub.list_enabled_products_for_import.return_value = []
assert not verify_security_hub_integration_enabled_per_region(
AWS_COMMERCIAL_PARTITION,
AWS_REGION_EU_WEST_1,
session,
AWS_ACCOUNT_NUMBER,
)
assert caplog.record_tuples == [
(
"root",
WARNING,
f"Security Hub is enabled in {AWS_REGION_EU_WEST_1} but Prowler integration does not accept findings. More info: https://docs.prowler.cloud/en/latest/tutorials/aws/securityhub/",
)
]
def test_verify_security_hub_integration_enabled_per_region_another_ClientError(
self, caplog
):
caplog.set_level(WARNING)
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
with patch(
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
) as mock_security_hub:
error_message = f"Another exception in region {AWS_REGION_EU_WEST_1}"
error_code = "AnotherException"
error_response = {
"Error": {
"Code": error_code,
"Message": error_message,
}
}
operation_name = "DescribeHub"
mock_security_hub.side_effect = ClientError(error_response, operation_name)
assert not verify_security_hub_integration_enabled_per_region(
AWS_COMMERCIAL_PARTITION,
AWS_REGION_EU_WEST_1,
session,
AWS_ACCOUNT_NUMBER,
)
assert caplog.record_tuples == [
(
"root",
ERROR,
f"ClientError -- [68]: An error occurred ({error_code}) when calling the {operation_name} operation: {error_message}",
)
]
def test_verify_security_hub_integration_enabled_per_region_another_Exception(
self, caplog
):
caplog.set_level(WARNING)
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
with patch(
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
) as mock_security_hub:
error_message = f"Another exception in region {AWS_REGION_EU_WEST_1}"
mock_security_hub.side_effect = Exception(error_message)
assert not verify_security_hub_integration_enabled_per_region(
AWS_COMMERCIAL_PARTITION,
AWS_REGION_EU_WEST_1,
session,
AWS_ACCOUNT_NUMBER,
)
assert caplog.record_tuples == [
(
"root",
ERROR,
f"Exception -- [68]: {error_message}",
)
]
def test_prepare_security_hub_findings_enabled_region_not_quiet(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(is_quiet=False)
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
assert prepare_security_hub_findings(
findings,
audit_info,
output_options,
enabled_regions,
) == {
AWS_REGION_EU_WEST_1: [get_security_hub_finding("PASSED")],
}
def test_prepare_security_hub_findings_quiet_INFO_finding(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(is_quiet=False)
findings = [self.generate_finding("INFO", AWS_REGION_EU_WEST_1)]
audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
assert prepare_security_hub_findings(
findings,
audit_info,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: []}
def test_prepare_security_hub_findings_disabled_region(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(is_quiet=False)
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_2)]
audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
assert prepare_security_hub_findings(
findings,
audit_info,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: []}
def test_prepare_security_hub_findings_quiet_PASS(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(is_quiet=True)
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
assert prepare_security_hub_findings(
findings,
audit_info,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: []}
def test_prepare_security_hub_findings_quiet_FAIL(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(is_quiet=True)
findings = [self.generate_finding("FAIL", AWS_REGION_EU_WEST_1)]
audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
assert prepare_security_hub_findings(
findings,
audit_info,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: [get_security_hub_finding("FAILED")]}
def test_prepare_security_hub_findings_send_sh_only_fails_PASS(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(send_sh_only_fails=True)
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
assert prepare_security_hub_findings(
findings,
audit_info,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: []}
def test_prepare_security_hub_findings_send_sh_only_fails_FAIL(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(send_sh_only_fails=True)
findings = [self.generate_finding("FAIL", AWS_REGION_EU_WEST_1)]
audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
assert prepare_security_hub_findings(
findings,
audit_info,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: [get_security_hub_finding("FAILED")]}
def test_prepare_security_hub_findings_no_audited_regions(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(is_quiet=False)
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
audit_info = set_mocked_aws_audit_info()
assert prepare_security_hub_findings(
findings,
audit_info,
output_options,
enabled_regions,
) == {
AWS_REGION_EU_WEST_1: [get_security_hub_finding("PASSED")],
}
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_batch_send_to_security_hub_one_finding(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(is_quiet=False)
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
security_hub_findings = prepare_security_hub_findings(
findings,
audit_info,
output_options,
enabled_regions,
)
assert (
batch_send_to_security_hub(
security_hub_findings,
session,
)
== 1
)