diff --git a/prowler/lib/outputs/json.py b/prowler/lib/outputs/json.py index 6713a132..33183738 100644 --- a/prowler/lib/outputs/json.py +++ b/prowler/lib/outputs/json.py @@ -34,60 +34,67 @@ from prowler.lib.utils.utils import hash_sha512, open_file def fill_json_asff(finding_output, audit_info, finding, output_options): - # Check if there are no resources in the finding - if finding.resource_arn == "": - if finding.resource_id == "": - finding.resource_id = "NONE_PROVIDED" - finding.resource_arn = finding.resource_id - finding_output.Id = f"prowler-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{hash_sha512(finding.resource_id)}" - finding_output.ProductArn = f"arn:{audit_info.audited_partition}:securityhub:{finding.region}::product/prowler/prowler" - finding_output.ProductFields = ProductFields( - ProviderVersion=prowler_version, ProwlerResourceName=finding.resource_arn - ) - finding_output.GeneratorId = "prowler-" + finding.check_metadata.CheckID - finding_output.AwsAccountId = audit_info.audited_account - finding_output.Types = finding.check_metadata.CheckType - finding_output.FirstObservedAt = ( - finding_output.UpdatedAt - ) = finding_output.CreatedAt = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ") - finding_output.Severity = Severity(Label=finding.check_metadata.Severity.upper()) - finding_output.Title = finding.check_metadata.CheckTitle - finding_output.Description = finding.status_extended - finding_output.Resources = [ - Resource( - Id=finding.resource_arn, - Type=finding.check_metadata.ResourceType, - Partition=audit_info.audited_partition, - Region=finding.region, + try: + # Check if there are no resources in the finding + if finding.resource_arn == "": + if finding.resource_id == "": + finding.resource_id = "NONE_PROVIDED" + finding.resource_arn = finding.resource_id + finding_output.Id = f"prowler-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{hash_sha512(finding.resource_id)}" + finding_output.ProductArn = f"arn:{audit_info.audited_partition}:securityhub:{finding.region}::product/prowler/prowler" + finding_output.ProductFields = ProductFields( + ProviderVersion=prowler_version, ProwlerResourceName=finding.resource_arn ) - ] - # Iterate for each compliance framework - compliance_summary = [] - associated_standards = [] - check_compliance = get_check_compliance(finding, "aws", output_options) - for key, value in check_compliance.items(): - associated_standards.append({"StandardsId": key}) - item = f"{key} {' '.join(value)}" - if len(item) > 64: - item = item[0:63] - compliance_summary.append(item) + finding_output.GeneratorId = "prowler-" + finding.check_metadata.CheckID + finding_output.AwsAccountId = audit_info.audited_account + finding_output.Types = finding.check_metadata.CheckType + finding_output.FirstObservedAt = ( + finding_output.UpdatedAt + ) = finding_output.CreatedAt = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ") + finding_output.Severity = Severity( + Label=finding.check_metadata.Severity.upper() + ) + finding_output.Title = finding.check_metadata.CheckTitle + finding_output.Description = finding.status_extended + finding_output.Resources = [ + Resource( + Id=finding.resource_arn, + Type=finding.check_metadata.ResourceType, + Partition=audit_info.audited_partition, + Region=finding.region, + ) + ] + # Iterate for each compliance framework + compliance_summary = [] + associated_standards = [] + check_compliance = get_check_compliance(finding, "aws", output_options) + for key, value in check_compliance.items(): + associated_standards.append({"StandardsId": key}) + item = f"{key} {' '.join(value)}" + if len(item) > 64: + item = item[0:63] + compliance_summary.append(item) - # Ensures finding_status matches allowed values in ASFF - finding_status = generate_json_asff_status(finding.status) + # Ensures finding_status matches allowed values in ASFF + finding_status = generate_json_asff_status(finding.status) - finding_output.Compliance = Compliance( - Status=finding_status, - AssociatedStandards=associated_standards, - RelatedRequirements=compliance_summary, - ) - # Fill Recommendation Url if it is blank - if not finding.check_metadata.Remediation.Recommendation.Url: - finding.check_metadata.Remediation.Recommendation.Url = "https://docs.aws.amazon.com/securityhub/latest/userguide/what-is-securityhub.html" - finding_output.Remediation = { - "Recommendation": finding.check_metadata.Remediation.Recommendation - } + finding_output.Compliance = Compliance( + Status=finding_status, + AssociatedStandards=associated_standards, + RelatedRequirements=compliance_summary, + ) + # Fill Recommendation Url if it is blank + if not finding.check_metadata.Remediation.Recommendation.Url: + finding.check_metadata.Remediation.Recommendation.Url = "https://docs.aws.amazon.com/securityhub/latest/userguide/what-is-securityhub.html" + finding_output.Remediation = { + "Recommendation": finding.check_metadata.Remediation.Recommendation + } - return finding_output + return finding_output + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) def generate_json_asff_status(status: str) -> str: @@ -104,137 +111,184 @@ def generate_json_asff_status(status: str) -> str: return json_asff_status -def fill_json_ocsf( - finding_output: Check_Output_JSON_OCSF, audit_info, finding, output_options -): - resource_region = "" - resource_name = "" - resource_uid = "" - finding_uid = "" - resource_labels = finding.resource_tags if finding.resource_tags else [] - if finding.status == "PASS": - finding_output.status = "Success" - finding_output.status_id = 1 - elif finding.status == "FAIL": - finding_output.status = "Failure" - finding_output.status_id = 2 - finding_output.status_detail = finding_output.message = finding.status_extended - finding_output.severity = finding.check_metadata.Severity - if finding_output.severity == "low": - finding_output.severity_id = 2 - elif finding_output.severity == "medium": - finding_output.severity_id = 3 - elif finding_output.severity == "high": - finding_output.severity_id = 4 - elif finding_output.severity == "critical": - finding_output.severity_id = 5 - aws_account_name = "" - aws_org_uid = "" - if ( - hasattr(audit_info, "organizations_metadata") - and audit_info.organizations_metadata - ): - aws_account_name = audit_info.organizations_metadata.account_details_name - aws_org_uid = audit_info.organizations_metadata.account_details_org - finding_output.cloud = Cloud( - provider=finding.check_metadata.Provider, - ) - if finding.check_metadata.Provider == "aws": - finding_output.cloud.account = Account( - name=aws_account_name, - uid=audit_info.audited_account, +def fill_json_ocsf(audit_info, finding, output_options) -> Check_Output_JSON_OCSF: + try: + resource_region = "" + resource_name = "" + resource_uid = "" + finding_uid = "" + project_uid = "" + resource_labels = finding.resource_tags if finding.resource_tags else [] + aws_account_name = "" + aws_org_uid = "" + account = None + org = None + if ( + hasattr(audit_info, "organizations_metadata") + and audit_info.organizations_metadata + ): + aws_account_name = audit_info.organizations_metadata.account_details_name + aws_org_uid = audit_info.organizations_metadata.account_details_org + if finding.check_metadata.Provider == "aws": + account = Account( + name=aws_account_name, + uid=audit_info.audited_account, + ) + org = Organization( + name=aws_org_uid, + uid=aws_org_uid, + ) + resource_region = finding.region + resource_name = finding.resource_id + resource_uid = finding.resource_arn + finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{finding.resource_id}" + elif finding.check_metadata.Provider == "azure": + account = Account( + name=finding.subscription, + uid=finding.subscription, + ) + org = Organization( + name=audit_info.identity.domain, + uid=audit_info.identity.domain, + ) + resource_name = finding.resource_name + resource_uid = finding.resource_id + finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{finding.subscription}-{finding.resource_id}" + elif finding.check_metadata.Provider == "gcp": + project_uid = finding.project_id + resource_region = finding.location + resource_name = finding.resource_name + resource_uid = finding.resource_id + finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{finding.project_id}-{finding.resource_id}" + cloud = Cloud( + provider=finding.check_metadata.Provider, + org=org, + account=account, + region=resource_region, + project_uid=project_uid, ) - finding_output.cloud.org = Organization( - name=aws_org_uid, - uid=aws_org_uid, + finding_ocsf = Finding( + title=finding.check_metadata.CheckTitle, + uid=finding_uid, + desc=finding.check_metadata.Description, + supporting_data={ + "Risk": finding.check_metadata.Risk, + "Notes": finding.check_metadata.Notes, + }, + related_events=finding.check_metadata.DependsOn + + finding.check_metadata.RelatedTo, + remediation=Remediation_OCSF( + kb_articles=list( + filter( + None, + [ + finding.check_metadata.Remediation.Code.NativeIaC, + finding.check_metadata.Remediation.Code.Terraform, + finding.check_metadata.Remediation.Code.CLI, + finding.check_metadata.Remediation.Code.Other, + finding.check_metadata.Remediation.Recommendation.Url, + ], + ) + ), + desc=finding.check_metadata.Remediation.Recommendation.Text, + ), + types=finding.check_metadata.CheckType, + src_url=finding.check_metadata.RelatedUrl, ) - finding_output.cloud.region = resource_region = finding.region - resource_name = finding.resource_id - resource_uid = finding.resource_arn - finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{finding.resource_id}" - elif finding.check_metadata.Provider == "azure": - finding_output.cloud.account = Account( - name=finding.subscription, - uid=finding.subscription, + resources = [] + resources.append( + Resources( + group=Group(name=finding.check_metadata.ServiceName), + region=resource_region, + name=resource_name, + labels=resource_labels, + uid=resource_uid, + type=finding.check_metadata.ResourceType, + details=finding.resource_details, + ) ) - finding_output.cloud.org = Organization( - name=audit_info.identity.domain, - uid=audit_info.identity.domain, - ) - resource_name = finding.resource_name - resource_uid = finding.resource_id - finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{finding.subscription}-{finding.resource_id}" - elif finding.check_metadata.Provider == "gcp": - finding_output.cloud.account = None - finding_output.cloud.org = None - finding_output.cloud.project_uid = finding.project_id - finding_output.cloud.region = resource_region = finding.location - resource_name = finding.resource_name - resource_uid = finding.resource_id - finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{finding.project_id}-{finding.resource_id}" - finding_output.finding = Finding( - title=finding.check_metadata.CheckTitle, - uid=finding_uid, - desc=finding.check_metadata.Description, - supporting_data={ - "Risk": finding.check_metadata.Risk, - "Notes": finding.check_metadata.Notes, - }, - related_events=finding.check_metadata.DependsOn - + finding.check_metadata.RelatedTo, - remediation=Remediation_OCSF( - kb_articles=list( - filter( - None, - [ - finding.check_metadata.Remediation.Code.NativeIaC, - finding.check_metadata.Remediation.Code.Terraform, - finding.check_metadata.Remediation.Code.CLI, - finding.check_metadata.Remediation.Code.Other, - finding.check_metadata.Remediation.Recommendation.Url, - ], + metadata = Metadata( + product=Product( + feature=Feature( + uid=finding.check_metadata.CheckID, + name=finding.check_metadata.CheckID, ) ), - desc=finding.check_metadata.Remediation.Recommendation.Text, - ), - types=finding.check_metadata.CheckType, - src_url=finding.check_metadata.RelatedUrl, - ) - finding_output.resources.append( - Resources( - group=Group(name=finding.check_metadata.ServiceName), - region=resource_region, - name=resource_name, - labels=resource_labels, - uid=resource_uid, - type=finding.check_metadata.ResourceType, - details=finding.resource_details, + original_time=timestamp.isoformat(), + profiles=[audit_info.profile] + if hasattr(audit_info, "organizations_metadata") + else [], + ) + compliance = Compliance_OCSF( + status=generate_json_ocsf_status(finding.status), + status_detail=finding.status_extended, + requirements=unroll_dict_to_list( + get_check_compliance( + finding, finding.check_metadata.Provider, output_options + ) + ), + ) + finding_output = Check_Output_JSON_OCSF( + finding=finding_ocsf, + resources=resources, + status_detail=finding.status_extended, + message=finding.status_extended, + severity=finding.check_metadata.Severity.capitalize(), + severity_id=generate_json_ocsf_severity_id(finding.check_metadata.Severity), + status=generate_json_ocsf_status(finding.status), + status_id=generate_json_ocsf_status_id(finding.status), + compliance=compliance, + cloud=cloud, + time=timestamp.isoformat(), + metadata=metadata, + ) + return finding_output + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - ) - finding_output.time = timestamp.isoformat() - finding_output.metadata = Metadata( - product=Product( - feature=Feature( - uid=finding.check_metadata.CheckID, - name=finding.check_metadata.CheckID, - ) - ), - original_time=timestamp.isoformat(), - profiles=[audit_info.profile] - if hasattr(audit_info, "organizations_metadata") - else [], - ) - finding_output.compliance = Compliance_OCSF( - status=finding_output.status, - status_detail=finding_output.status_detail, - requirements=unroll_dict_to_list( - get_check_compliance( - finding, finding.check_metadata.Provider, output_options - ) - ), - ) - return finding_output + +def generate_json_ocsf_status(status: str): + json_ocsf_status = "" + if status == "PASS": + json_ocsf_status = "Success" + elif status == "FAIL": + json_ocsf_status = "Failure" + elif status == "WARNING": + json_ocsf_status = "Other" + else: + json_ocsf_status = "Unknown" + + return json_ocsf_status + + +def generate_json_ocsf_status_id(status: str): + json_ocsf_status_id = 0 + if status == "PASS": + json_ocsf_status_id = 1 + elif status == "FAIL": + json_ocsf_status_id = 2 + elif status == "WARNING": + json_ocsf_status_id = 99 + else: + json_ocsf_status_id = 0 + + return json_ocsf_status_id + + +def generate_json_ocsf_severity_id(severity: str): + json_ocsf_severity_id = 0 + if severity == "low": + json_ocsf_severity_id = 2 + elif severity == "medium": + json_ocsf_severity_id = 3 + elif severity == "high": + json_ocsf_severity_id = 4 + elif severity == "critical": + json_ocsf_severity_id = 5 + + return json_ocsf_severity_id def close_json(output_filename, output_directory, mode): diff --git a/prowler/lib/outputs/models.py b/prowler/lib/outputs/models.py index 0350941e..47a60024 100644 --- a/prowler/lib/outputs/models.py +++ b/prowler/lib/outputs/models.py @@ -684,11 +684,11 @@ class Organization(BaseModel): class Cloud(BaseModel): - account: Account = None - region: str = "" - org: Organization = None + account: Optional[Account] + region: str + org: Optional[Organization] provider: str - project_uid: str = "" + project_uid: str class Feature(BaseModel): @@ -718,22 +718,22 @@ class Check_Output_JSON_OCSF(BaseModel): https://schema.ocsf.io/1.0.0-rc.3/classes/security_finding """ - finding: Finding = None - resources: List[Resources] = [] - status_detail: str = "" - compliance: Compliance_OCSF = None - message: str = "" - severity_id: Literal[0, 1, 2, 3, 4, 5, 6, 99] = 99 + finding: Finding + resources: List[Resources] + status_detail: str + compliance: Compliance_OCSF + message: str + severity_id: Literal[0, 1, 2, 3, 4, 5, 6, 99] severity: Literal[ "Informational", "Low", "Medium", "High", "Critical", "Fatal", "Other" - ] = "Other" - cloud: Cloud = None - time: datetime = None - metadata: Metadata = None - state_id: str = 0 + ] + cloud: Cloud + time: datetime + metadata: Metadata + state_id: int = 0 state: str = "New" - status_id: Literal[0, 1, 2, 99] = 0 - status: Literal["Unknown", "Success", "Failure", "Other"] = "Unknown" + status_id: Literal[0, 1, 2, 99] + status: Literal["Unknown", "Success", "Failure", "Other"] type_uid: int = 200101 type_name: str = "Security Finding: Create" impact_id: int = 0 diff --git a/prowler/lib/outputs/outputs.py b/prowler/lib/outputs/outputs.py index 3280690a..f4283ab3 100644 --- a/prowler/lib/outputs/outputs.py +++ b/prowler/lib/outputs/outputs.py @@ -19,7 +19,6 @@ from prowler.lib.outputs.html import fill_html from prowler.lib.outputs.json import fill_json_asff, fill_json_ocsf from prowler.lib.outputs.models import ( Check_Output_JSON_ASFF, - Check_Output_JSON_OCSF, generate_provider_output_csv, generate_provider_output_json, unroll_tags, @@ -164,15 +163,15 @@ def report(check_findings, output_options, audit_info): file_descriptors["json"].write(",") if "json-ocsf" in file_descriptors: - finding_output = Check_Output_JSON_OCSF() - fill_json_ocsf( - finding_output, audit_info, finding, output_options + finding_output = fill_json_ocsf( + audit_info, finding, output_options ) json.dump( finding_output.dict(), file_descriptors["json-ocsf"], indent=4, + default=str, ) file_descriptors["json-ocsf"].write(",") diff --git a/tests/lib/outputs/outputs_test.py b/tests/lib/outputs/outputs_test.py index 3abb5566..b28c74e8 100644 --- a/tests/lib/outputs/outputs_test.py +++ b/tests/lib/outputs/outputs_test.py @@ -15,6 +15,7 @@ from prowler.config.config import ( orange_color, output_file_timestamp, prowler_version, + timestamp, timestamp_utc, ) from prowler.lib.check.compliance_models import ( @@ -24,13 +25,32 @@ from prowler.lib.check.compliance_models import ( ) from prowler.lib.check.models import Check_Report, load_check_metadata from prowler.lib.outputs.file_descriptors import fill_file_descriptors -from prowler.lib.outputs.json import fill_json_asff, generate_json_asff_status +from prowler.lib.outputs.json import ( + fill_json_asff, + fill_json_ocsf, + generate_json_asff_status, + generate_json_ocsf_severity_id, + generate_json_ocsf_status, + generate_json_ocsf_status_id, +) from prowler.lib.outputs.models import ( + Account, Check_Output_CSV, Check_Output_JSON_ASFF, + Check_Output_JSON_OCSF, + Cloud, Compliance, + Compliance_OCSF, + Feature, + Finding, + Group, + Metadata, + Organization, + Product, ProductFields, + Remediation_OCSF, Resource, + Resources, Severity, generate_csv_fields, get_check_compliance, @@ -553,6 +573,128 @@ class Test_Outputs: fill_json_asff(input, input_audit_info, finding, output_options) == expected ) + def test_fill_json_ocsf(self): + input_audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=None, + audited_account=AWS_ACCOUNT_ID, + audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_ID}:root", + audited_identity_arn="test-arn", + audited_user_id="test", + audited_partition="aws", + profile="default", + profile_region="eu-west-1", + credentials=None, + assumed_role_info=None, + audited_regions=["eu-west-2", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, + mfa_enabled=False, + ) + finding = Check_Report( + load_check_metadata( + f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json" + ).json() + ) + finding.resource_details = "Test resource details" + finding.resource_id = "test-resource" + finding.resource_arn = "test-arn" + finding.region = "eu-west-1" + finding.status = "PASS" + finding.status_extended = "This is a test" + + expected = Check_Output_JSON_OCSF( + finding=Finding( + title="Ensure credentials unused for 30 days or greater are disabled", + desc="Ensure credentials unused for 30 days or greater are disabled", + supporting_data={ + "Risk": "Risk associated.", + "Notes": "additional information", + }, + remediation=Remediation_OCSF( + kb_articles=[ + "code or URL to the code location.", + "code or URL to the code location.", + "cli command or URL to the cli command location.", + "cli command or URL to the cli command location.", + "https://myfp.com/recommendations/dangerous_things_and_how_to_fix_them.html", + ], + desc="Run sudo yum update and cross your fingers and toes.", + ), + types=["Software and Configuration Checks"], + src_url="https://serviceofficialsiteorpageforthissubject", + uid="prowler-aws-iam_disable_30_days_credentials-123456789012-eu-west-1-test-resource", + related_events=[ + "othercheck1", + "othercheck2", + "othercheck3", + "othercheck4", + ], + ), + resources=[ + Resources( + group=Group(name="iam"), + region="eu-west-1", + name="test-resource", + uid="test-arn", + labels=[], + type="AwsIamAccessAnalyzer", + details="Test resource details", + ) + ], + status_detail="This is a test", + compliance=Compliance_OCSF( + status="Success", requirements=[], status_detail="This is a test" + ), + message="This is a test", + severity_id=2, + severity="Low", + cloud=Cloud( + account=Account(name="", uid="123456789012"), + region="eu-west-1", + org=Organization(uid="", name=""), + provider="aws", + project_uid="", + ), + time=timestamp.isoformat(), + metadata=Metadata( + original_time=timestamp.isoformat(), + profiles=["default"], + product=Product( + language="en", + name="Prowler", + version="3.6.1", + vendor_name="Prowler/ProwlerPro", + feature=Feature( + name="iam_disable_30_days_credentials", + uid="iam_disable_30_days_credentials", + version="3.6.1", + ), + ), + version="1.0.0-rc.3", + ), + state_id=0, + state="New", + status_id=1, + status="Success", + type_uid=200101, + type_name="Security Finding: Create", + impact_id=0, + impact="Unknown", + confidence_id=0, + confidence="Unknown", + activity_id=1, + activity_name="Create", + category_uid=2, + category_name="Findings", + class_uid=2001, + class_name="Security Finding", + ) + output_options = mock.MagicMock() + + assert fill_json_ocsf(input_audit_info, finding, output_options) == expected + @mock_s3 def test_send_to_s3_bucket(self): # Create mock session @@ -923,3 +1065,22 @@ class Test_Outputs: assert generate_json_asff_status("FAIL") == "FAILED" assert generate_json_asff_status("WARNING") == "WARNING" assert generate_json_asff_status("SOMETHING ELSE") == "NOT_AVAILABLE" + + def test_generate_json_ocsf_status(self): + assert generate_json_ocsf_status("PASS") == "Success" + assert generate_json_ocsf_status("FAIL") == "Failure" + assert generate_json_ocsf_status("WARNING") == "Other" + assert generate_json_ocsf_status("SOMETHING ELSE") == "Unknown" + + def test_generate_json_ocsf_status_id(self): + assert generate_json_ocsf_status_id("PASS") == 1 + assert generate_json_ocsf_status_id("FAIL") == 2 + assert generate_json_ocsf_status_id("WARNING") == 99 + assert generate_json_ocsf_status_id("SOMETHING ELSE") == 0 + + def test_generate_json_ocsf_severity_id(self): + assert generate_json_ocsf_severity_id("low") == 2 + assert generate_json_ocsf_severity_id("medium") == 3 + assert generate_json_ocsf_severity_id("high") == 4 + assert generate_json_ocsf_severity_id("critical") == 5 + assert generate_json_ocsf_severity_id("something else") == 0