chore(OCSF): improve OCSF logic (#2502)

This commit is contained in:
Sergio Garcia
2023-06-19 12:37:04 +02:00
committed by GitHub
parent 60c341befd
commit 8ea5ba5d3f
4 changed files with 410 additions and 196 deletions

View File

@@ -34,60 +34,67 @@ from prowler.lib.utils.utils import hash_sha512, open_file
def fill_json_asff(finding_output, audit_info, finding, output_options):
# Check if there are no resources in the finding
if finding.resource_arn == "":
if finding.resource_id == "":
finding.resource_id = "NONE_PROVIDED"
finding.resource_arn = finding.resource_id
finding_output.Id = f"prowler-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{hash_sha512(finding.resource_id)}"
finding_output.ProductArn = f"arn:{audit_info.audited_partition}:securityhub:{finding.region}::product/prowler/prowler"
finding_output.ProductFields = ProductFields(
ProviderVersion=prowler_version, ProwlerResourceName=finding.resource_arn
)
finding_output.GeneratorId = "prowler-" + finding.check_metadata.CheckID
finding_output.AwsAccountId = audit_info.audited_account
finding_output.Types = finding.check_metadata.CheckType
finding_output.FirstObservedAt = (
finding_output.UpdatedAt
) = finding_output.CreatedAt = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
finding_output.Severity = Severity(Label=finding.check_metadata.Severity.upper())
finding_output.Title = finding.check_metadata.CheckTitle
finding_output.Description = finding.status_extended
finding_output.Resources = [
Resource(
Id=finding.resource_arn,
Type=finding.check_metadata.ResourceType,
Partition=audit_info.audited_partition,
Region=finding.region,
try:
# Check if there are no resources in the finding
if finding.resource_arn == "":
if finding.resource_id == "":
finding.resource_id = "NONE_PROVIDED"
finding.resource_arn = finding.resource_id
finding_output.Id = f"prowler-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{hash_sha512(finding.resource_id)}"
finding_output.ProductArn = f"arn:{audit_info.audited_partition}:securityhub:{finding.region}::product/prowler/prowler"
finding_output.ProductFields = ProductFields(
ProviderVersion=prowler_version, ProwlerResourceName=finding.resource_arn
)
]
# Iterate for each compliance framework
compliance_summary = []
associated_standards = []
check_compliance = get_check_compliance(finding, "aws", output_options)
for key, value in check_compliance.items():
associated_standards.append({"StandardsId": key})
item = f"{key} {' '.join(value)}"
if len(item) > 64:
item = item[0:63]
compliance_summary.append(item)
finding_output.GeneratorId = "prowler-" + finding.check_metadata.CheckID
finding_output.AwsAccountId = audit_info.audited_account
finding_output.Types = finding.check_metadata.CheckType
finding_output.FirstObservedAt = (
finding_output.UpdatedAt
) = finding_output.CreatedAt = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
finding_output.Severity = Severity(
Label=finding.check_metadata.Severity.upper()
)
finding_output.Title = finding.check_metadata.CheckTitle
finding_output.Description = finding.status_extended
finding_output.Resources = [
Resource(
Id=finding.resource_arn,
Type=finding.check_metadata.ResourceType,
Partition=audit_info.audited_partition,
Region=finding.region,
)
]
# Iterate for each compliance framework
compliance_summary = []
associated_standards = []
check_compliance = get_check_compliance(finding, "aws", output_options)
for key, value in check_compliance.items():
associated_standards.append({"StandardsId": key})
item = f"{key} {' '.join(value)}"
if len(item) > 64:
item = item[0:63]
compliance_summary.append(item)
# Ensures finding_status matches allowed values in ASFF
finding_status = generate_json_asff_status(finding.status)
# Ensures finding_status matches allowed values in ASFF
finding_status = generate_json_asff_status(finding.status)
finding_output.Compliance = Compliance(
Status=finding_status,
AssociatedStandards=associated_standards,
RelatedRequirements=compliance_summary,
)
# Fill Recommendation Url if it is blank
if not finding.check_metadata.Remediation.Recommendation.Url:
finding.check_metadata.Remediation.Recommendation.Url = "https://docs.aws.amazon.com/securityhub/latest/userguide/what-is-securityhub.html"
finding_output.Remediation = {
"Recommendation": finding.check_metadata.Remediation.Recommendation
}
finding_output.Compliance = Compliance(
Status=finding_status,
AssociatedStandards=associated_standards,
RelatedRequirements=compliance_summary,
)
# Fill Recommendation Url if it is blank
if not finding.check_metadata.Remediation.Recommendation.Url:
finding.check_metadata.Remediation.Recommendation.Url = "https://docs.aws.amazon.com/securityhub/latest/userguide/what-is-securityhub.html"
finding_output.Remediation = {
"Recommendation": finding.check_metadata.Remediation.Recommendation
}
return finding_output
return finding_output
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def generate_json_asff_status(status: str) -> str:
@@ -104,137 +111,184 @@ def generate_json_asff_status(status: str) -> str:
return json_asff_status
def fill_json_ocsf(
finding_output: Check_Output_JSON_OCSF, audit_info, finding, output_options
):
resource_region = ""
resource_name = ""
resource_uid = ""
finding_uid = ""
resource_labels = finding.resource_tags if finding.resource_tags else []
if finding.status == "PASS":
finding_output.status = "Success"
finding_output.status_id = 1
elif finding.status == "FAIL":
finding_output.status = "Failure"
finding_output.status_id = 2
finding_output.status_detail = finding_output.message = finding.status_extended
finding_output.severity = finding.check_metadata.Severity
if finding_output.severity == "low":
finding_output.severity_id = 2
elif finding_output.severity == "medium":
finding_output.severity_id = 3
elif finding_output.severity == "high":
finding_output.severity_id = 4
elif finding_output.severity == "critical":
finding_output.severity_id = 5
aws_account_name = ""
aws_org_uid = ""
if (
hasattr(audit_info, "organizations_metadata")
and audit_info.organizations_metadata
):
aws_account_name = audit_info.organizations_metadata.account_details_name
aws_org_uid = audit_info.organizations_metadata.account_details_org
finding_output.cloud = Cloud(
provider=finding.check_metadata.Provider,
)
if finding.check_metadata.Provider == "aws":
finding_output.cloud.account = Account(
name=aws_account_name,
uid=audit_info.audited_account,
def fill_json_ocsf(audit_info, finding, output_options) -> Check_Output_JSON_OCSF:
try:
resource_region = ""
resource_name = ""
resource_uid = ""
finding_uid = ""
project_uid = ""
resource_labels = finding.resource_tags if finding.resource_tags else []
aws_account_name = ""
aws_org_uid = ""
account = None
org = None
if (
hasattr(audit_info, "organizations_metadata")
and audit_info.organizations_metadata
):
aws_account_name = audit_info.organizations_metadata.account_details_name
aws_org_uid = audit_info.organizations_metadata.account_details_org
if finding.check_metadata.Provider == "aws":
account = Account(
name=aws_account_name,
uid=audit_info.audited_account,
)
org = Organization(
name=aws_org_uid,
uid=aws_org_uid,
)
resource_region = finding.region
resource_name = finding.resource_id
resource_uid = finding.resource_arn
finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{finding.resource_id}"
elif finding.check_metadata.Provider == "azure":
account = Account(
name=finding.subscription,
uid=finding.subscription,
)
org = Organization(
name=audit_info.identity.domain,
uid=audit_info.identity.domain,
)
resource_name = finding.resource_name
resource_uid = finding.resource_id
finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{finding.subscription}-{finding.resource_id}"
elif finding.check_metadata.Provider == "gcp":
project_uid = finding.project_id
resource_region = finding.location
resource_name = finding.resource_name
resource_uid = finding.resource_id
finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{finding.project_id}-{finding.resource_id}"
cloud = Cloud(
provider=finding.check_metadata.Provider,
org=org,
account=account,
region=resource_region,
project_uid=project_uid,
)
finding_output.cloud.org = Organization(
name=aws_org_uid,
uid=aws_org_uid,
finding_ocsf = Finding(
title=finding.check_metadata.CheckTitle,
uid=finding_uid,
desc=finding.check_metadata.Description,
supporting_data={
"Risk": finding.check_metadata.Risk,
"Notes": finding.check_metadata.Notes,
},
related_events=finding.check_metadata.DependsOn
+ finding.check_metadata.RelatedTo,
remediation=Remediation_OCSF(
kb_articles=list(
filter(
None,
[
finding.check_metadata.Remediation.Code.NativeIaC,
finding.check_metadata.Remediation.Code.Terraform,
finding.check_metadata.Remediation.Code.CLI,
finding.check_metadata.Remediation.Code.Other,
finding.check_metadata.Remediation.Recommendation.Url,
],
)
),
desc=finding.check_metadata.Remediation.Recommendation.Text,
),
types=finding.check_metadata.CheckType,
src_url=finding.check_metadata.RelatedUrl,
)
finding_output.cloud.region = resource_region = finding.region
resource_name = finding.resource_id
resource_uid = finding.resource_arn
finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{audit_info.audited_account}-{finding.region}-{finding.resource_id}"
elif finding.check_metadata.Provider == "azure":
finding_output.cloud.account = Account(
name=finding.subscription,
uid=finding.subscription,
resources = []
resources.append(
Resources(
group=Group(name=finding.check_metadata.ServiceName),
region=resource_region,
name=resource_name,
labels=resource_labels,
uid=resource_uid,
type=finding.check_metadata.ResourceType,
details=finding.resource_details,
)
)
finding_output.cloud.org = Organization(
name=audit_info.identity.domain,
uid=audit_info.identity.domain,
)
resource_name = finding.resource_name
resource_uid = finding.resource_id
finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{finding.subscription}-{finding.resource_id}"
elif finding.check_metadata.Provider == "gcp":
finding_output.cloud.account = None
finding_output.cloud.org = None
finding_output.cloud.project_uid = finding.project_id
finding_output.cloud.region = resource_region = finding.location
resource_name = finding.resource_name
resource_uid = finding.resource_id
finding_uid = f"prowler-{finding.check_metadata.Provider}-{finding.check_metadata.CheckID}-{finding.project_id}-{finding.resource_id}"
finding_output.finding = Finding(
title=finding.check_metadata.CheckTitle,
uid=finding_uid,
desc=finding.check_metadata.Description,
supporting_data={
"Risk": finding.check_metadata.Risk,
"Notes": finding.check_metadata.Notes,
},
related_events=finding.check_metadata.DependsOn
+ finding.check_metadata.RelatedTo,
remediation=Remediation_OCSF(
kb_articles=list(
filter(
None,
[
finding.check_metadata.Remediation.Code.NativeIaC,
finding.check_metadata.Remediation.Code.Terraform,
finding.check_metadata.Remediation.Code.CLI,
finding.check_metadata.Remediation.Code.Other,
finding.check_metadata.Remediation.Recommendation.Url,
],
metadata = Metadata(
product=Product(
feature=Feature(
uid=finding.check_metadata.CheckID,
name=finding.check_metadata.CheckID,
)
),
desc=finding.check_metadata.Remediation.Recommendation.Text,
),
types=finding.check_metadata.CheckType,
src_url=finding.check_metadata.RelatedUrl,
)
finding_output.resources.append(
Resources(
group=Group(name=finding.check_metadata.ServiceName),
region=resource_region,
name=resource_name,
labels=resource_labels,
uid=resource_uid,
type=finding.check_metadata.ResourceType,
details=finding.resource_details,
original_time=timestamp.isoformat(),
profiles=[audit_info.profile]
if hasattr(audit_info, "organizations_metadata")
else [],
)
compliance = Compliance_OCSF(
status=generate_json_ocsf_status(finding.status),
status_detail=finding.status_extended,
requirements=unroll_dict_to_list(
get_check_compliance(
finding, finding.check_metadata.Provider, output_options
)
),
)
finding_output = Check_Output_JSON_OCSF(
finding=finding_ocsf,
resources=resources,
status_detail=finding.status_extended,
message=finding.status_extended,
severity=finding.check_metadata.Severity.capitalize(),
severity_id=generate_json_ocsf_severity_id(finding.check_metadata.Severity),
status=generate_json_ocsf_status(finding.status),
status_id=generate_json_ocsf_status_id(finding.status),
compliance=compliance,
cloud=cloud,
time=timestamp.isoformat(),
metadata=metadata,
)
return finding_output
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
)
finding_output.time = timestamp.isoformat()
finding_output.metadata = Metadata(
product=Product(
feature=Feature(
uid=finding.check_metadata.CheckID,
name=finding.check_metadata.CheckID,
)
),
original_time=timestamp.isoformat(),
profiles=[audit_info.profile]
if hasattr(audit_info, "organizations_metadata")
else [],
)
finding_output.compliance = Compliance_OCSF(
status=finding_output.status,
status_detail=finding_output.status_detail,
requirements=unroll_dict_to_list(
get_check_compliance(
finding, finding.check_metadata.Provider, output_options
)
),
)
return finding_output
def generate_json_ocsf_status(status: str):
json_ocsf_status = ""
if status == "PASS":
json_ocsf_status = "Success"
elif status == "FAIL":
json_ocsf_status = "Failure"
elif status == "WARNING":
json_ocsf_status = "Other"
else:
json_ocsf_status = "Unknown"
return json_ocsf_status
def generate_json_ocsf_status_id(status: str):
json_ocsf_status_id = 0
if status == "PASS":
json_ocsf_status_id = 1
elif status == "FAIL":
json_ocsf_status_id = 2
elif status == "WARNING":
json_ocsf_status_id = 99
else:
json_ocsf_status_id = 0
return json_ocsf_status_id
def generate_json_ocsf_severity_id(severity: str):
json_ocsf_severity_id = 0
if severity == "low":
json_ocsf_severity_id = 2
elif severity == "medium":
json_ocsf_severity_id = 3
elif severity == "high":
json_ocsf_severity_id = 4
elif severity == "critical":
json_ocsf_severity_id = 5
return json_ocsf_severity_id
def close_json(output_filename, output_directory, mode):

View File

@@ -684,11 +684,11 @@ class Organization(BaseModel):
class Cloud(BaseModel):
account: Account = None
region: str = ""
org: Organization = None
account: Optional[Account]
region: str
org: Optional[Organization]
provider: str
project_uid: str = ""
project_uid: str
class Feature(BaseModel):
@@ -718,22 +718,22 @@ class Check_Output_JSON_OCSF(BaseModel):
https://schema.ocsf.io/1.0.0-rc.3/classes/security_finding
"""
finding: Finding = None
resources: List[Resources] = []
status_detail: str = ""
compliance: Compliance_OCSF = None
message: str = ""
severity_id: Literal[0, 1, 2, 3, 4, 5, 6, 99] = 99
finding: Finding
resources: List[Resources]
status_detail: str
compliance: Compliance_OCSF
message: str
severity_id: Literal[0, 1, 2, 3, 4, 5, 6, 99]
severity: Literal[
"Informational", "Low", "Medium", "High", "Critical", "Fatal", "Other"
] = "Other"
cloud: Cloud = None
time: datetime = None
metadata: Metadata = None
state_id: str = 0
]
cloud: Cloud
time: datetime
metadata: Metadata
state_id: int = 0
state: str = "New"
status_id: Literal[0, 1, 2, 99] = 0
status: Literal["Unknown", "Success", "Failure", "Other"] = "Unknown"
status_id: Literal[0, 1, 2, 99]
status: Literal["Unknown", "Success", "Failure", "Other"]
type_uid: int = 200101
type_name: str = "Security Finding: Create"
impact_id: int = 0

View File

@@ -19,7 +19,6 @@ from prowler.lib.outputs.html import fill_html
from prowler.lib.outputs.json import fill_json_asff, fill_json_ocsf
from prowler.lib.outputs.models import (
Check_Output_JSON_ASFF,
Check_Output_JSON_OCSF,
generate_provider_output_csv,
generate_provider_output_json,
unroll_tags,
@@ -164,15 +163,15 @@ def report(check_findings, output_options, audit_info):
file_descriptors["json"].write(",")
if "json-ocsf" in file_descriptors:
finding_output = Check_Output_JSON_OCSF()
fill_json_ocsf(
finding_output, audit_info, finding, output_options
finding_output = fill_json_ocsf(
audit_info, finding, output_options
)
json.dump(
finding_output.dict(),
file_descriptors["json-ocsf"],
indent=4,
default=str,
)
file_descriptors["json-ocsf"].write(",")

View File

@@ -15,6 +15,7 @@ from prowler.config.config import (
orange_color,
output_file_timestamp,
prowler_version,
timestamp,
timestamp_utc,
)
from prowler.lib.check.compliance_models import (
@@ -24,13 +25,32 @@ from prowler.lib.check.compliance_models import (
)
from prowler.lib.check.models import Check_Report, load_check_metadata
from prowler.lib.outputs.file_descriptors import fill_file_descriptors
from prowler.lib.outputs.json import fill_json_asff, generate_json_asff_status
from prowler.lib.outputs.json import (
fill_json_asff,
fill_json_ocsf,
generate_json_asff_status,
generate_json_ocsf_severity_id,
generate_json_ocsf_status,
generate_json_ocsf_status_id,
)
from prowler.lib.outputs.models import (
Account,
Check_Output_CSV,
Check_Output_JSON_ASFF,
Check_Output_JSON_OCSF,
Cloud,
Compliance,
Compliance_OCSF,
Feature,
Finding,
Group,
Metadata,
Organization,
Product,
ProductFields,
Remediation_OCSF,
Resource,
Resources,
Severity,
generate_csv_fields,
get_check_compliance,
@@ -553,6 +573,128 @@ class Test_Outputs:
fill_json_asff(input, input_audit_info, finding, output_options) == expected
)
def test_fill_json_ocsf(self):
input_audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=None,
audited_account=AWS_ACCOUNT_ID,
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_ID}:root",
audited_identity_arn="test-arn",
audited_user_id="test",
audited_partition="aws",
profile="default",
profile_region="eu-west-1",
credentials=None,
assumed_role_info=None,
audited_regions=["eu-west-2", "eu-west-1"],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
)
finding = Check_Report(
load_check_metadata(
f"{path.dirname(path.realpath(__file__))}/fixtures/metadata.json"
).json()
)
finding.resource_details = "Test resource details"
finding.resource_id = "test-resource"
finding.resource_arn = "test-arn"
finding.region = "eu-west-1"
finding.status = "PASS"
finding.status_extended = "This is a test"
expected = Check_Output_JSON_OCSF(
finding=Finding(
title="Ensure credentials unused for 30 days or greater are disabled",
desc="Ensure credentials unused for 30 days or greater are disabled",
supporting_data={
"Risk": "Risk associated.",
"Notes": "additional information",
},
remediation=Remediation_OCSF(
kb_articles=[
"code or URL to the code location.",
"code or URL to the code location.",
"cli command or URL to the cli command location.",
"cli command or URL to the cli command location.",
"https://myfp.com/recommendations/dangerous_things_and_how_to_fix_them.html",
],
desc="Run sudo yum update and cross your fingers and toes.",
),
types=["Software and Configuration Checks"],
src_url="https://serviceofficialsiteorpageforthissubject",
uid="prowler-aws-iam_disable_30_days_credentials-123456789012-eu-west-1-test-resource",
related_events=[
"othercheck1",
"othercheck2",
"othercheck3",
"othercheck4",
],
),
resources=[
Resources(
group=Group(name="iam"),
region="eu-west-1",
name="test-resource",
uid="test-arn",
labels=[],
type="AwsIamAccessAnalyzer",
details="Test resource details",
)
],
status_detail="This is a test",
compliance=Compliance_OCSF(
status="Success", requirements=[], status_detail="This is a test"
),
message="This is a test",
severity_id=2,
severity="Low",
cloud=Cloud(
account=Account(name="", uid="123456789012"),
region="eu-west-1",
org=Organization(uid="", name=""),
provider="aws",
project_uid="",
),
time=timestamp.isoformat(),
metadata=Metadata(
original_time=timestamp.isoformat(),
profiles=["default"],
product=Product(
language="en",
name="Prowler",
version="3.6.1",
vendor_name="Prowler/ProwlerPro",
feature=Feature(
name="iam_disable_30_days_credentials",
uid="iam_disable_30_days_credentials",
version="3.6.1",
),
),
version="1.0.0-rc.3",
),
state_id=0,
state="New",
status_id=1,
status="Success",
type_uid=200101,
type_name="Security Finding: Create",
impact_id=0,
impact="Unknown",
confidence_id=0,
confidence="Unknown",
activity_id=1,
activity_name="Create",
category_uid=2,
category_name="Findings",
class_uid=2001,
class_name="Security Finding",
)
output_options = mock.MagicMock()
assert fill_json_ocsf(input_audit_info, finding, output_options) == expected
@mock_s3
def test_send_to_s3_bucket(self):
# Create mock session
@@ -923,3 +1065,22 @@ class Test_Outputs:
assert generate_json_asff_status("FAIL") == "FAILED"
assert generate_json_asff_status("WARNING") == "WARNING"
assert generate_json_asff_status("SOMETHING ELSE") == "NOT_AVAILABLE"
def test_generate_json_ocsf_status(self):
assert generate_json_ocsf_status("PASS") == "Success"
assert generate_json_ocsf_status("FAIL") == "Failure"
assert generate_json_ocsf_status("WARNING") == "Other"
assert generate_json_ocsf_status("SOMETHING ELSE") == "Unknown"
def test_generate_json_ocsf_status_id(self):
assert generate_json_ocsf_status_id("PASS") == 1
assert generate_json_ocsf_status_id("FAIL") == 2
assert generate_json_ocsf_status_id("WARNING") == 99
assert generate_json_ocsf_status_id("SOMETHING ELSE") == 0
def test_generate_json_ocsf_severity_id(self):
assert generate_json_ocsf_severity_id("low") == 2
assert generate_json_ocsf_severity_id("medium") == 3
assert generate_json_ocsf_severity_id("high") == 4
assert generate_json_ocsf_severity_id("critical") == 5
assert generate_json_ocsf_severity_id("something else") == 0