fix(outputs): apply -q to security hub (#1637)

Co-authored-by: sergargar <sergio@verica.io>
This commit is contained in:
Sergio Garcia
2023-01-02 15:56:49 +01:00
committed by GitHub
parent 79c09e613b
commit 8db86992aa
3 changed files with 140 additions and 34 deletions

View File

@@ -3,6 +3,7 @@ from os import path, remove
from unittest import mock
import boto3
import botocore
import pytest
from colorama import Fore
from moto import mock_s3
@@ -35,16 +36,41 @@ from prowler.lib.outputs.outputs import (
)
from prowler.lib.utils.utils import hash_sha512, open_file
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.lib.security_hub.security_hub import send_to_security_hub
AWS_ACCOUNT_ID = "123456789012"
# Mocking Security Hub Get Findings
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "BatchImportFindings":
return {
"FailedCount": 0,
"SuccessCount": 1,
}
if operation_name == "DescribeHub":
return {
"HubArn": "test-hub",
}
if operation_name == "ListEnabledProductsForImport":
return {
"ProductSubscriptions": [
"prowler/prowler",
],
}
return make_api_call(self, operation_name, kwarg)
class Test_Outputs:
def test_fill_file_descriptors(self):
audited_account = "123456789012"
audited_account = AWS_ACCOUNT_ID
output_directory = f"{os.path.dirname(os.path.realpath(__file__))}"
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=None,
audited_account="123456789012",
audited_account=AWS_ACCOUNT_ID,
audited_identity_arn="test-arn",
audited_user_id="test",
audited_partition="aws",
@@ -177,7 +203,7 @@ class Test_Outputs:
# input_audit_info = AWS_Audit_Info(
# original_session=None,
# audit_session=None,
# audited_account="123456789012",
# audited_account=AWS_ACCOUNT_ID,
# audited_identity_arn="test-arn",
# audited_user_id="test",
# audited_partition="aws",
@@ -206,7 +232,7 @@ class Test_Outputs:
# expected.AssessmentStartTime = timestamp_iso
# expected.FindingUniqueId = ""
# expected.Profile = "default"
# expected.AccountId = "123456789012"
# expected.AccountId = AWS_ACCOUNT_ID
# expected.OrganizationsInfo = None
# expected.Region = "eu-west-1"
# expected.Status = "PASS"
@@ -221,7 +247,7 @@ class Test_Outputs:
input_audit_info = AWS_Audit_Info(
original_session=None,
audit_session=None,
audited_account="123456789012",
audited_account=AWS_ACCOUNT_ID,
audited_identity_arn="test-arn",
audited_user_id="test",
audited_partition="aws",
@@ -253,7 +279,7 @@ class Test_Outputs:
ProviderVersion=prowler_version, ProwlerResourceName="test-resource"
)
expected.GeneratorId = "prowler-" + finding.check_metadata.CheckID
expected.AwsAccountId = "123456789012"
expected.AwsAccountId = AWS_ACCOUNT_ID
expected.Types = finding.check_metadata.CheckType
expected.FirstObservedAt = (
expected.UpdatedAt
@@ -290,7 +316,7 @@ class Test_Outputs:
input_audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session,
audited_account="123456789012",
audited_account=AWS_ACCOUNT_ID,
audited_identity_arn="test-arn",
audited_user_id="test",
audited_partition="aws",
@@ -382,3 +408,61 @@ class Test_Outputs:
assert stats["total_fail"] == 0
assert stats["resources_count"] == 0
assert stats["findings_count"] == 0
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_send_to_security_hub(self):
# Create mock session
session = boto3.session.Session(
region_name="eu-west-1",
)
input_audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session,
audited_account=AWS_ACCOUNT_ID,
audited_identity_arn="test-arn",
audited_user_id="test",
audited_partition="aws",
profile="default",
profile_region="eu-west-1",
credentials=None,
assumed_role_info=None,
audited_regions=["eu-west-2", "eu-west-1"],
organizations_metadata=None,
)
finding = Check_Report(
load_check_metadata(
f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json"
).json()
)
finding.resource_details = "Test resource details"
finding.resource_id = "test-resource"
finding.resource_arn = "test-arn"
finding.region = "eu-west-1"
finding.status = "PASS"
finding.status_extended = "This is a test"
finding_output = Check_Output_JSON_ASFF()
fill_json_asff(finding_output, input_audit_info, finding)
assert (
send_to_security_hub(
False,
finding.status,
finding.region,
finding_output,
input_audit_info.audit_session,
)
== 1
)
# Setting is_quiet to True
assert (
send_to_security_hub(
True,
finding.status,
finding.region,
finding_output,
input_audit_info.audit_session,
)
== 0
)