feat(oscf): add OCSF format as JSON output for AWS, Azure and GCP. Hello Amazon Security Lake! (#2429)

This commit is contained in:
Sergio Garcia
2023-06-07 14:28:43 +02:00
committed by GitHub
parent 3e3e8a14ee
commit cdf2a13bbd
9 changed files with 335 additions and 15 deletions

View File

@@ -189,7 +189,7 @@ def prowler():
if args.output_modes:
for mode in args.output_modes:
# Close json file if exists
if mode == "json" or mode == "json-asff":
if "json" in mode:
close_json(
audit_output_options.output_filename, args.output_directory, mode
)

View File

@@ -46,6 +46,7 @@ timestamp_iso = timestamp.isoformat(sep=" ", timespec="seconds")
csv_file_suffix = ".csv"
json_file_suffix = ".json"
json_asff_file_suffix = ".asff.json"
json_ocsf_file_suffix = ".ocsf.json"
html_file_suffix = ".html"
config_yaml = f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/config.yaml"

View File

@@ -124,8 +124,8 @@ Detailed documentation at https://docs.prowler.cloud
"--output-modes",
nargs="+",
help="Output modes, by default csv, html and json",
default=["csv", "json", "html"],
choices=["csv", "json", "json-asff", "html"],
default=["csv", "json", "html", "json-ocsf"],
choices=["csv", "json", "json-asff", "html", "json-ocsf"],
)
common_outputs_parser.add_argument(
"-F",

View File

@@ -7,6 +7,7 @@ from prowler.config.config import (
html_file_suffix,
json_asff_file_suffix,
json_file_suffix,
json_ocsf_file_suffix,
)
from prowler.lib.logger import logger
from prowler.lib.outputs.html import add_html_header
@@ -45,7 +46,7 @@ def initialize_file_descriptor(
"a",
)
if output_mode in ("json", "json-asff"):
if output_mode in ("json", "json-asff", "json-ocsf"):
file_descriptor.write("[")
elif "html" in output_mode:
add_html_header(file_descriptor, audit_info)
@@ -101,6 +102,15 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, audit
)
file_descriptors.update({output_mode: file_descriptor})
elif output_mode == "json-ocsf":
filename = (
f"{output_directory}/{output_filename}{json_ocsf_file_suffix}"
)
file_descriptor = initialize_file_descriptor(
filename, output_mode, audit_info
)
file_descriptors.update({output_mode: file_descriptor})
elif output_mode == "html":
filename = f"{output_directory}/{output_filename}{html_file_suffix}"
file_descriptor = initialize_file_descriptor(

View File

@@ -4,16 +4,31 @@ import sys
from prowler.config.config import (
json_asff_file_suffix,
json_file_suffix,
json_ocsf_file_suffix,
prowler_version,
timestamp,
timestamp_utc,
)
from prowler.lib.logger import logger
from prowler.lib.outputs.models import (
Account,
Check_Output_JSON_OCSF,
Cloud,
Compliance,
Compliance_OCSF,
Feature,
Finding,
Group,
Metadata,
Organization,
Product,
ProductFields,
Remediation_OCSF,
Resource,
Resources,
Severity,
get_check_compliance,
unroll_dict_to_list,
)
from prowler.lib.utils.utils import hash_sha512, open_file
@@ -70,12 +85,147 @@ def fill_json_asff(finding_output, audit_info, finding, output_options):
return finding_output
def fill_json_ocsf(
finding_output: Check_Output_JSON_OCSF, audit_info, finding, output_options
):
resource_region = ""
resource_name = ""
resource_uid = ""
finding_uid = ""
resource_labels = finding.resource_tags if finding.resource_tags else []
if finding.status == "PASS":
finding_output.status = "Success"
finding_output.status_id = 1
elif finding.status == "FAIL":
finding_output.status = "Failure"
finding_output.status_id = 2
finding_output.status_detail = finding_output.message = finding.status_extended
finding_output.severity = finding.check_metadata.Severity
if finding_output.severity == "low":
finding_output.severity_id = 2
elif finding_output.severity == "medium":
finding_output.severity_id = 3
elif finding_output.severity == "high":
finding_output.severity_id = 4
elif finding_output.severity == "critical":
finding_output.severity_id = 5
aws_account_name = ""
aws_org_uid = ""
if (
hasattr(audit_info, "organizations_metadata")
and audit_info.organizations_metadata
):
aws_account_name = audit_info.organizations_metadata.account_details_name
aws_org_uid = audit_info.organizations_metadata.account_details_org
finding_output.cloud = Cloud(
provider=finding.check_metadata.Provider,
)
if finding.check_metadata.Provider == "aws":
finding_output.cloud.account = Account(
name=aws_account_name,
uid=audit_info.audited_account,
)
finding_output.cloud.org = Organization(
name=aws_org_uid,
uid=aws_org_uid,
)
finding_output.cloud.region = resource_region = finding.region
resource_name = finding.resource_id
resource_uid = finding.resource_arn
finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{finding.resource_id}"
elif finding.check_metadata.Provider == "azure":
finding_output.cloud.account = Account(
name=finding.subscription,
uid=finding.subscription,
)
finding_output.cloud.org = Organization(
name=audit_info.identity.domain,
uid=audit_info.identity.domain,
)
resource_name = finding.resource_name
resource_uid = finding.resource_id
finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{finding.subscription}-{finding.resource_id}"
elif finding.check_metadata.Provider == "gcp":
finding_output.cloud.account = None
finding_output.cloud.org = None
finding_output.cloud.project_uid = finding.project_id
finding_output.cloud.region = resource_region = finding.location
resource_name = finding.resource_name
resource_uid = finding.resource_id
finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{finding.project_id}-{finding.resource_id}"
finding_output.finding = Finding(
title=finding.check_metadata.CheckTitle,
uid=finding_uid,
desc=finding.check_metadata.Description,
supporting_data={
"Risk": finding.check_metadata.Risk,
"Notes": finding.check_metadata.Notes,
},
related_events=finding.check_metadata.DependsOn
+ finding.check_metadata.RelatedTo,
remediation=Remediation_OCSF(
kb_articles=list(
filter(
None,
[
finding.check_metadata.Remediation.Code.NativeIaC,
finding.check_metadata.Remediation.Code.Terraform,
finding.check_metadata.Remediation.Code.CLI,
finding.check_metadata.Remediation.Code.Other,
finding.check_metadata.Remediation.Recommendation.Url,
],
)
),
desc=finding.check_metadata.Remediation.Recommendation.Text,
),
types=finding.check_metadata.CheckType,
src_url=finding.check_metadata.RelatedUrl,
)
finding_output.resources.append(
Resources(
group=Group(name=finding.check_metadata.ServiceName),
region=resource_region,
name=resource_name,
labels=resource_labels,
uid=resource_uid,
type=finding.check_metadata.ResourceType,
details=finding.resource_details,
)
)
finding_output.time = timestamp.isoformat()
finding_output.metadata = Metadata(
product=Product(
feature=Feature(
uid=finding.check_metadata.CheckID,
name=finding.check_metadata.CheckID,
)
),
original_time=timestamp.isoformat(),
profiles=[audit_info.profile]
if hasattr(audit_info, "organizations_metadata")
else [],
)
finding_output.compliance = Compliance_OCSF(
status=finding_output.status,
status_detail=finding_output.status_detail,
requirements=unroll_dict_to_list(
get_check_compliance(
finding, finding.check_metadata.Provider, output_options
)
),
)
return finding_output
def close_json(output_filename, output_directory, mode):
"""close_json closes the output JSON file replacing the last comma with ]"""
try:
suffix = json_file_suffix
if mode == "json-asff":
suffix = json_asff_file_suffix
elif mode == "json-ocsf":
suffix = json_ocsf_file_suffix
filename = f"{output_directory}/{output_filename}{suffix}"
# Close JSON file if exists
if os.path.isfile(filename):

View File

@@ -1,11 +1,12 @@
import importlib
import sys
from csv import DictWriter
from typing import Any, List, Optional
from datetime import datetime
from typing import Any, List, Literal, Optional
from pydantic import BaseModel
from prowler.config.config import timestamp
from prowler.config.config import prowler_version, timestamp
from prowler.lib.check.models import Remediation
from prowler.lib.logger import logger
from prowler.providers.aws.lib.audit_info.models import AWS_Organizations_Info
@@ -228,6 +229,18 @@ def unroll_dict(dict: dict):
return unrolled_items
def unroll_dict_to_list(dict: dict):
list = []
for key, value in dict.items():
if type(value) == list:
value = ", ".join(value)
list.append(f"{key}: {value}")
else:
list.append(f"{key}: {value}")
return list
def parse_html_string(str: str):
string = ""
for elem in str.split(" | "):
@@ -621,3 +634,115 @@ class Check_Output_JSON_ASFF(BaseModel):
Resources: List[Resource] = None
Compliance: Compliance = None
Remediation: dict = None
# JSON OCSF
class Remediation_OCSF(BaseModel):
kb_articles: List[str]
desc: str
class Finding(BaseModel):
title: str
desc: str
supporting_data: dict
remediation: Remediation_OCSF
types: List[str]
src_url: str
uid: str
related_events: List[str]
class Group(BaseModel):
name: str
class Resources(BaseModel):
group: Group
region: str
name: str
uid: str
labels: list
type: str
details: str
class Compliance_OCSF(BaseModel):
status: str
requirements: List[str]
status_detail: str
class Account(BaseModel):
name: str
uid: str
class Organization(BaseModel):
uid: str
name: str
class Cloud(BaseModel):
account: Account = None
region: str = ""
org: Organization = None
provider: str
project_uid: str = ""
class Feature(BaseModel):
name: str
uid: str
version: str = prowler_version
class Product(BaseModel):
language: str = "en"
name: str = "Prowler"
version: str = prowler_version
vendor_name: str = "Prowler/ProwlerPro"
feature: Feature
class Metadata(BaseModel):
original_time: str
profiles: List[str]
product: Product
version: str = "1.0.0-rc.3"
class Check_Output_JSON_OCSF(BaseModel):
"""
Check_Output_JSON_OCSF generates a finding's output in JSON OCSF format.
https://schema.ocsf.io/1.0.0-rc.3/classes/security_finding
"""
finding: Finding = None
resources: List[Resources] = []
status_detail: str = ""
compliance: Compliance_OCSF = None
message: str = ""
severity_id: Literal[0, 1, 2, 3, 4, 5, 6, 99] = 99
severity: Literal[
"Informational", "Low", "Medium", "High", "Critical", "Fatal", "Other"
] = "Other"
cloud: Cloud = None
time: datetime = None
metadata: Metadata = None
state_id: str = 0
state: str = "New"
status_id: Literal[0, 1, 2, 99] = 0
status: Literal["Unknown", "Success", "Failure", "Other"] = "Unknown"
type_uid: int = 200101
type_name: str = "Security Finding: Create"
impact_id: int = 0
impact: str = "Unknown"
confidence_id: int = 0
confidence: str = "Unknown"
activity_id: int = 1
activity_name: str = "Create"
category_uid: int = 2
category_name: str = "Findings"
class_uid: int = 2001
class_name: str = "Security Finding"

View File

@@ -9,15 +9,17 @@ from prowler.config.config import (
html_file_suffix,
json_asff_file_suffix,
json_file_suffix,
json_ocsf_file_suffix,
orange_color,
)
from prowler.lib.logger import logger
from prowler.lib.outputs.compliance import add_manual_controls, fill_compliance
from prowler.lib.outputs.file_descriptors import fill_file_descriptors
from prowler.lib.outputs.html import fill_html
from prowler.lib.outputs.json import fill_json_asff
from prowler.lib.outputs.json import fill_json_asff, fill_json_ocsf
from prowler.lib.outputs.models import (
Check_Output_JSON_ASFF,
Check_Output_JSON_OCSF,
generate_provider_output_csv,
generate_provider_output_json,
unroll_tags,
@@ -161,6 +163,19 @@ def report(check_findings, output_options, audit_info):
)
file_descriptors["json"].write(",")
if "json-ocsf" in file_descriptors:
finding_output = Check_Output_JSON_OCSF()
fill_json_ocsf(
finding_output, audit_info, finding, output_options
)
json.dump(
finding_output.dict(),
file_descriptors["json-ocsf"],
indent=4,
)
file_descriptors["json-ocsf"].write(",")
else: # No service resources in the whole account
color = set_report_color("INFO")
if output_options.verbose:
@@ -208,6 +223,8 @@ def send_to_s3_bucket(
filename = f"{output_filename}{json_file_suffix}"
elif output_mode == "json-asff":
filename = f"{output_filename}{json_asff_file_suffix}"
elif output_mode == "json-ocsf":
filename = f"{output_filename}{json_ocsf_file_suffix}"
elif output_mode == "html":
filename = f"{output_filename}{html_file_suffix}"
else: # Compliance output mode

View File

@@ -3,6 +3,13 @@ import sys
from colorama import Fore, Style
from tabulate import tabulate
from prowler.config.config import (
csv_file_suffix,
html_file_suffix,
json_asff_file_suffix,
json_file_suffix,
json_ocsf_file_suffix,
)
from prowler.lib.logger import logger
from prowler.providers.common.outputs import Provider_Output_Options
@@ -108,13 +115,23 @@ def display_summary_table(
)
print("\nDetailed results are in:")
if "html" in output_options.output_modes:
print(f" - HTML: {output_directory}/{output_filename}.html")
print(
f" - HTML: {output_directory}/{output_filename}{html_file_suffix}"
)
if "json-asff" in output_options.output_modes:
print(f" - JSON-ASFF: {output_directory}/{output_filename}.asff.json")
print(
f" - JSON-ASFF: {output_directory}/{output_filename}{json_asff_file_suffix}"
)
if "json-ocsf" in output_options.output_modes:
print(
f" - JSON-OCSF: {output_directory}/{output_filename}{json_ocsf_file_suffix}"
)
if "csv" in output_options.output_modes:
print(f" - CSV: {output_directory}/{output_filename}.csv")
print(f" - CSV: {output_directory}/{output_filename}{csv_file_suffix}")
if "json" in output_options.output_modes:
print(f" - JSON: {output_directory}/{output_filename}.json")
print(
f" - JSON: {output_directory}/{output_filename}{json_file_suffix}"
)
else:
print(

View File

@@ -18,7 +18,7 @@ class Test_Parser:
parsed = self.parser.parse(command)
assert parsed.provider == provider
assert not parsed.quiet
assert len(parsed.output_modes) == 3
assert len(parsed.output_modes) == 4
assert "csv" in parsed.output_modes
assert "html" in parsed.output_modes
assert "json" in parsed.output_modes
@@ -64,7 +64,7 @@ class Test_Parser:
parsed = self.parser.parse(command)
assert parsed.provider == provider
assert not parsed.quiet
assert len(parsed.output_modes) == 3
assert len(parsed.output_modes) == 4
assert "csv" in parsed.output_modes
assert "html" in parsed.output_modes
assert "json" in parsed.output_modes
@@ -102,7 +102,7 @@ class Test_Parser:
parsed = self.parser.parse(command)
assert parsed.provider == provider
assert not parsed.quiet
assert len(parsed.output_modes) == 3
assert len(parsed.output_modes) == 4
assert "csv" in parsed.output_modes
assert "html" in parsed.output_modes
assert "json" in parsed.output_modes
@@ -203,7 +203,7 @@ class Test_Parser:
def test_root_parser_default_output_modes(self):
command = [prowler_command]
parsed = self.parser.parse(command)
assert len(parsed.output_modes) == 3
assert len(parsed.output_modes) == 4
assert "csv" in parsed.output_modes
assert "json" in parsed.output_modes
assert "html" in parsed.output_modes