mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
feat(outputs): Unify classes to generate outputs dynamically based on the provider (#1545)
Co-authored-by: n4ch04 <nachor1992@gmail.com> Co-authored-by: sergargar <sergio@verica.io>
This commit is contained in:
@@ -13,13 +13,11 @@ from prowler.config.config import (
|
||||
orange_color,
|
||||
output_file_timestamp,
|
||||
prowler_version,
|
||||
timestamp_iso,
|
||||
timestamp_utc,
|
||||
)
|
||||
from prowler.lib.check.models import Check_Report, load_check_metadata
|
||||
from prowler.lib.outputs.models import (
|
||||
Check_Output_CSV,
|
||||
Check_Output_JSON,
|
||||
Check_Output_JSON_ASFF,
|
||||
Compliance,
|
||||
ProductFields,
|
||||
@@ -28,7 +26,6 @@ from prowler.lib.outputs.models import (
|
||||
)
|
||||
from prowler.lib.outputs.outputs import (
|
||||
fill_file_descriptors,
|
||||
fill_json,
|
||||
fill_json_asff,
|
||||
generate_csv_fields,
|
||||
send_to_s3_bucket,
|
||||
@@ -141,19 +138,11 @@ class Test_Outputs:
|
||||
)
|
||||
assert exc.type == Exception
|
||||
|
||||
def test_generate_csv_fields(self):
|
||||
def test_generate_common_csv_fields(self):
|
||||
expected = [
|
||||
"assessment_start_time",
|
||||
"finding_unique_id",
|
||||
"provider",
|
||||
"profile",
|
||||
"account_id",
|
||||
"account_name",
|
||||
"account_email",
|
||||
"account_arn",
|
||||
"account_org",
|
||||
"account_tags",
|
||||
"region",
|
||||
"check_id",
|
||||
"check_title",
|
||||
"check_type",
|
||||
@@ -162,8 +151,6 @@ class Test_Outputs:
|
||||
"service_name",
|
||||
"subservice_name",
|
||||
"severity",
|
||||
"resource_id",
|
||||
"resource_arn",
|
||||
"resource_type",
|
||||
"resource_details",
|
||||
"resource_tags",
|
||||
@@ -180,54 +167,53 @@ class Test_Outputs:
|
||||
"depends_on",
|
||||
"related_to",
|
||||
"notes",
|
||||
# "compliance",
|
||||
]
|
||||
|
||||
assert generate_csv_fields(Check_Output_CSV) == expected
|
||||
|
||||
def test_fill_json(self):
|
||||
input_audit_info = AWS_Audit_Info(
|
||||
original_session=None,
|
||||
audit_session=None,
|
||||
audited_account="123456789012",
|
||||
audited_identity_arn="test-arn",
|
||||
audited_user_id="test",
|
||||
audited_partition="aws",
|
||||
profile="default",
|
||||
profile_region="eu-west-1",
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["eu-west-2", "eu-west-1"],
|
||||
organizations_metadata=None,
|
||||
)
|
||||
finding = Check_Report(
|
||||
load_check_metadata(
|
||||
f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json"
|
||||
).json()
|
||||
)
|
||||
finding.resource_details = "Test resource details"
|
||||
finding.resource_id = "test-resource"
|
||||
finding.resource_arn = "test-arn"
|
||||
finding.region = "eu-west-1"
|
||||
finding.status = "PASS"
|
||||
finding.status_extended = "This is a test"
|
||||
# def test_fill_json(self):
|
||||
# input_audit_info = AWS_Audit_Info(
|
||||
# original_session=None,
|
||||
# audit_session=None,
|
||||
# audited_account="123456789012",
|
||||
# audited_identity_arn="test-arn",
|
||||
# audited_user_id="test",
|
||||
# audited_partition="aws",
|
||||
# profile="default",
|
||||
# profile_region="eu-west-1",
|
||||
# credentials=None,
|
||||
# assumed_role_info=None,
|
||||
# audited_regions=["eu-west-2", "eu-west-1"],
|
||||
# organizations_metadata=None,
|
||||
# )
|
||||
# finding = Check_Report(
|
||||
# load_check_metadata(
|
||||
# f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json"
|
||||
# ).json()
|
||||
# )
|
||||
# finding.resource_details = "Test resource details"
|
||||
# finding.resource_id = "test-resource"
|
||||
# finding.resource_arn = "test-arn"
|
||||
# finding.region = "eu-west-1"
|
||||
# finding.status = "PASS"
|
||||
# finding.status_extended = "This is a test"
|
||||
|
||||
input = Check_Output_JSON(**finding.check_metadata.dict())
|
||||
# input = Check_Output_JSON(**finding.check_metadata.dict())
|
||||
|
||||
expected = Check_Output_JSON(**finding.check_metadata.dict())
|
||||
expected.AssessmentStartTime = timestamp_iso
|
||||
expected.FindingUniqueId = ""
|
||||
expected.Profile = "default"
|
||||
expected.AccountId = "123456789012"
|
||||
expected.OrganizationsInfo = None
|
||||
expected.Region = "eu-west-1"
|
||||
expected.Status = "PASS"
|
||||
expected.StatusExtended = "This is a test"
|
||||
expected.ResourceId = "test-resource"
|
||||
expected.ResourceArn = "test-arn"
|
||||
expected.ResourceDetails = "Test resource details"
|
||||
# expected = Check_Output_JSON(**finding.check_metadata.dict())
|
||||
# expected.AssessmentStartTime = timestamp_iso
|
||||
# expected.FindingUniqueId = ""
|
||||
# expected.Profile = "default"
|
||||
# expected.AccountId = "123456789012"
|
||||
# expected.OrganizationsInfo = None
|
||||
# expected.Region = "eu-west-1"
|
||||
# expected.Status = "PASS"
|
||||
# expected.StatusExtended = "This is a test"
|
||||
# expected.ResourceId = "test-resource"
|
||||
# expected.ResourceArn = "test-arn"
|
||||
# expected.ResourceDetails = "Test resource details"
|
||||
|
||||
assert fill_json(input, input_audit_info, finding) == expected
|
||||
# assert fill_json(input, input_audit_info, finding) == expected
|
||||
|
||||
def test_fill_json_asff(self):
|
||||
input_audit_info = AWS_Audit_Info(
|
||||
|
||||
Reference in New Issue
Block a user