From 182d0381c3eb9ba4b42728ce68e143ff97a06c38 Mon Sep 17 00:00:00 2001 From: Nacho Rivera <59198746+n4ch04@users.noreply.github.com> Date: Tue, 23 Aug 2022 11:51:40 +0200 Subject: [PATCH] chore(tests): Add tests to output generation (#1340) * chore(tests): added tests to outputs * fix(timestamp): change timestamp coming from config --- config/config.py | 1 + lib/outputs/__init__.py | 0 lib/outputs/fixtures/metadata.json | 56 +++++++ lib/outputs/outputs.py | 4 +- lib/outputs/outputs_test.py | 258 +++++++++++++++++++++++++++++ 5 files changed, 317 insertions(+), 2 deletions(-) create mode 100644 lib/outputs/__init__.py create mode 100644 lib/outputs/fixtures/metadata.json create mode 100644 lib/outputs/outputs_test.py diff --git a/config/config.py b/config/config.py index d354ebe2..3e1bf281 100644 --- a/config/config.py +++ b/config/config.py @@ -14,6 +14,7 @@ aws_services_json_file = "providers/aws/aws_regions_by_service.json" default_output_directory = getcwd() + "/output" output_file_timestamp = timestamp.strftime("%Y%m%d%H%M%S") +timestamp_iso = timestamp.isoformat() csv_file_suffix = f"{output_file_timestamp}.csv" json_file_suffix = f"{output_file_timestamp}.json" json_asff_file_suffix = f"{output_file_timestamp}.asff.json" diff --git a/lib/outputs/__init__.py b/lib/outputs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/outputs/fixtures/metadata.json b/lib/outputs/fixtures/metadata.json new file mode 100644 index 00000000..167ffed2 --- /dev/null +++ b/lib/outputs/fixtures/metadata.json @@ -0,0 +1,56 @@ +{ + "Categories": [ + "cat1", + "cat2" + ], + "CheckID": "iam_disable_30_days_credentials", + "CheckTitle": "Ensure credentials unused for 30 days or greater are disabled", + "CheckType": "Software and Configuration Checks", + "Compliance": [ + { + "Control": [ + "4.4" + ], + "Framework": "CIS-AWS", + "Group": [ + "level1", + "level2" + ], + "Version": "1.4" + } + ], + "DependsOn": [ + "othercheck1", + "othercheck2" + ], + "Description": "Ensure credentials unused for 30 days or greater are disabled", + "Notes": "additional information", + "Provider": "aws", + "RelatedTo": [ + "othercheck3", + "othercheck4" + ], + "RelatedUrl": "https://serviceofficialsiteorpageforthissubject", + "Remediation": { + "Code": { + "CLI": "cli command or URL to the cli command location.", + "NativeIaC": "code or URL to the code location.", + "Other": "cli command or URL to the cli command location.", + "Terraform": "code or URL to the code location." + }, + "Recommendation": { + "Text": "Run sudo yum update and cross your fingers and toes.", + "Url": "https://myfp.com/recommendations/dangerous_things_and_how_to_fix_them.html" + } + }, + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "ResourceType": "AwsIamAccessAnalyzer", + "Risk": "Risk associated.", + "ServiceName": "iam", + "Severity": "low", + "SubServiceName": "accessanalyzer", + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + } +} diff --git a/lib/outputs/outputs.py b/lib/outputs/outputs.py index c2263055..01bf3ed7 100644 --- a/lib/outputs/outputs.py +++ b/lib/outputs/outputs.py @@ -9,7 +9,7 @@ from config.config import ( json_asff_file_suffix, json_file_suffix, prowler_version, - timestamp, + timestamp_iso, timestamp_utc, ) from lib.outputs.models import ( @@ -184,7 +184,7 @@ def generate_csv_fields(): def fill_json(finding_output, audit_info, finding): - finding_output.AssessmentStartTime = timestamp.isoformat() + finding_output.AssessmentStartTime = timestamp_iso finding_output.FindingUniqueId = "" finding_output.Profile = audit_info.profile finding_output.AccountId = audit_info.audited_account diff --git a/lib/outputs/outputs_test.py b/lib/outputs/outputs_test.py new file mode 100644 index 00000000..2b6c2529 --- /dev/null +++ b/lib/outputs/outputs_test.py @@ -0,0 +1,258 @@ +from os import path, remove + +from colorama import Fore + +from config.config import ( + csv_file_suffix, + json_asff_file_suffix, + json_file_suffix, + prowler_version, + timestamp_iso, + timestamp_utc, +) +from lib.check.models import Check_Report, load_check_metadata +from lib.outputs.models import ( + Check_Output_JSON, + Check_Output_JSON_ASFF, + Compliance, + ProductFields, + Resource, + Severity, +) +from lib.outputs.outputs import ( + fill_file_descriptors, + fill_json, + fill_json_asff, + generate_csv_fields, + set_report_color, +) +from lib.utils.utils import hash_sha512, open_file +from providers.aws.models import AWS_Audit_Info + + +class Test_Outputs: + def test_fill_file_descriptors(self): + audited_account = "123456789012" + output_directory = "." + csv_fields = generate_csv_fields() + test_output_modes = [ + ["csv"], + ["json"], + ["json-asff"], + ["csv", "json"], + ["csv", "json", "json-asff"], + ] + + expected = [ + { + "csv": open_file( + f"{output_directory}/prowler-output-{audited_account}-{csv_file_suffix}", + "a", + ) + }, + { + "json": open_file( + f"{output_directory}/prowler-output-{audited_account}-{json_file_suffix}", + "a", + ) + }, + { + "json-asff": open_file( + f"{output_directory}/prowler-output-{audited_account}-{json_asff_file_suffix}", + "a", + ) + }, + { + "csv": open_file( + f"{output_directory}/prowler-output-{audited_account}-{csv_file_suffix}", + "a", + ), + "json": open_file( + f"{output_directory}/prowler-output-{audited_account}-{json_file_suffix}", + "a", + ), + }, + { + "csv": open_file( + f"{output_directory}/prowler-output-{audited_account}-{csv_file_suffix}", + "a", + ), + "json": open_file( + f"{output_directory}/prowler-output-{audited_account}-{json_file_suffix}", + "a", + ), + "json-asff": open_file( + f"{output_directory}/prowler-output-{audited_account}-{json_asff_file_suffix}", + "a", + ), + }, + ] + + for index, output_mode_list in enumerate(test_output_modes): + test_output_file_descriptors = fill_file_descriptors( + output_mode_list, audited_account, output_directory, csv_fields + ) + for output_mode in output_mode_list: + assert ( + test_output_file_descriptors[output_mode].name + == expected[index][output_mode].name + ) + remove(expected[index][output_mode].name) + + def test_set_report_color(self): + test_status = ["PASS", "FAIL", "ERROR", "WARNING"] + test_colors = [Fore.GREEN, Fore.RED, Fore.BLACK, Fore.YELLOW] + + for status in test_status: + assert set_report_color(status) in test_colors + + def test_generate_csv_fields(self): + expected = [ + "assessment_start_time", + "finding_unique_id", + "provider", + "profile", + "account_id", + "account_name", + "account_email", + "account_arn", + "account_org", + "account_tags", + "region", + "check_id", + "check_title", + "check_type", + "status", + "status_extended", + "service_name", + "subservice_name", + "severity", + "resource_id", + "resource_arn", + "resource_type", + "resource_details", + "resource_tags", + "description", + "risk", + "related_url", + "remediation_recommendation_text", + "remediation_recommendation_url", + "remediation_recommendation_code_nativeiac", + "remediation_recommendation_code_terraform", + "remediation_recommendation_code_cli", + "remediation_recommendation_code_other", + "categories", + "depends_on", + "related_to", + "notes", + "compliance", + ] + + assert generate_csv_fields() == expected + + def test_fill_json(self): + input_audit_info = AWS_Audit_Info( + original_session=None, + audit_session=None, + audited_account="123456789012", + audited_identity_arn="test-arn", + audited_user_id="test", + audited_partition="aws", + profile="default", + profile_region="eu-west-1", + credentials=None, + assumed_role_info=None, + audited_regions=["eu-west-2", "eu-west-1"], + organizations_metadata=None, + ) + finding = Check_Report( + load_check_metadata( + f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json" + ) + ) + finding.resource_details = "Test resource details" + finding.resource_id = "test-resource" + finding.resource_arn = "test-arn" + finding.region = "eu-west-1" + finding.status = "PASS" + finding.status_extended = "This is a test" + + input = Check_Output_JSON(**finding.check_metadata.dict()) + + expected = Check_Output_JSON(**finding.check_metadata.dict()) + expected.AssessmentStartTime = timestamp_iso + expected.FindingUniqueId = "" + expected.Profile = "default" + expected.AccountId = "123456789012" + expected.OrganizationsInfo = None + expected.Region = "eu-west-1" + expected.Status = "PASS" + expected.StatusExtended = "This is a test" + expected.ResourceId = "test-resource" + expected.ResourceArn = "test-arn" + expected.ResourceDetails = "Test resource details" + + assert fill_json(input, input_audit_info, finding) == expected + + def test_fill_json_asff(self): + input_audit_info = AWS_Audit_Info( + original_session=None, + audit_session=None, + audited_account="123456789012", + audited_identity_arn="test-arn", + audited_user_id="test", + audited_partition="aws", + profile="default", + profile_region="eu-west-1", + credentials=None, + assumed_role_info=None, + audited_regions=["eu-west-2", "eu-west-1"], + organizations_metadata=None, + ) + finding = Check_Report( + load_check_metadata( + f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json" + ) + ) + finding.resource_details = "Test resource details" + finding.resource_id = "test-resource" + finding.resource_arn = "test-arn" + finding.region = "eu-west-1" + finding.status = "PASS" + finding.status_extended = "This is a test" + + input = Check_Output_JSON_ASFF() + + expected = Check_Output_JSON_ASFF() + expected.Id = f"prowler-{finding.check_metadata.CheckID}-123456789012-eu-west-1-{hash_sha512('test-resource')}" + expected.ProductArn = f"arn:aws:securityhub:eu-west-1::product/prowler/prowler" + expected.ProductFields = ProductFields( + ProviderVersion=prowler_version, ProwlerResourceName="test-resource" + ) + expected.GeneratorId = "prowler-" + finding.check_metadata.CheckID + expected.AwsAccountId = "123456789012" + expected.Types = [finding.check_metadata.CheckType] + expected.FirstObservedAt = ( + expected.UpdatedAt + ) = expected.CreatedAt = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ") + expected.Severity = Severity(Label=finding.check_metadata.Severity.upper()) + expected.Title = finding.check_metadata.CheckTitle + expected.Description = finding.check_metadata.Description + expected.Resources = [ + Resource( + Id="test-resource", + Type=finding.check_metadata.ResourceType, + Partition="aws", + Region="eu-west-1", + ) + ] + + expected.Compliance = Compliance( + Status="PASS" + "ED", + RelatedRequirements=[finding.check_metadata.CheckType], + ) + expected.Remediation = { + "Recommendation": finding.check_metadata.Remediation.Recommendation + } + + assert fill_json_asff(input, input_audit_info, finding) == expected