feat(custom_checks_metadata): Add checks metadata overide for severity (#3038)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
Co-authored-by: Sergio Garcia <sergargar1@gmail.com>
This commit is contained in:
Vajrala Venkateswarlu
2023-11-20 15:14:47 +05:30
committed by GitHub
parent 43c96a7875
commit 0a305c281f
11 changed files with 364 additions and 2 deletions

View File

@@ -0,0 +1,43 @@
# Custom Checks Metadata
In certain organizations, the severity of specific checks might differ from the default values defined in the check's metadata. For instance, while `s3_bucket_level_public_access_block` could be deemed `critical` for some organizations, others might assign a different severity level.
The custom metadata option offers a means to override default metadata set by Prowler
You can utilize `--custom-checks-metadata-file` followed by the path to your custom checks metadata YAML file.
## Available Fields
The list of supported check's metadata fields that can be override are listed as follows:
- Severity
## File Syntax
This feature is available for all the providers supported in Prowler since the metadata format is common between all the providers. The following is the YAML format for the custom checks metadata file:
```yaml title="custom_checks_metadata.yaml"
CustomChecksMetadata:
aws:
Checks:
s3_bucket_level_public_access_block:
Severity: high
s3_bucket_no_mfa_delete:
Severity: high
azure:
Checks:
storage_infrastructure_encryption_is_enabled:
Severity: medium
gcp:
Checks:
compute_instance_public_ip:
Severity: critical
```
## Usage
Executing the following command will assess all checks and generate a report while overriding the metadata for those checks:
```sh
prowler <provider> --custom-checks-metadata-file <path/to/custom/metadata>
```
This customization feature enables organizations to tailor the severity of specific checks based on their unique requirements, providing greater flexibility in security assessment and reporting.

View File

@@ -38,6 +38,7 @@ nav:
- Logging: tutorials/logging.md
- Allowlist: tutorials/allowlist.md
- Check Aliases: tutorials/check-aliases.md
- Custom Metadata: tutorials/custom-checks-metadata.md
- Ignore Unused Services: tutorials/ignore-unused-services.md
- Pentesting: tutorials/pentesting.md
- Developer Guide: developer-guide/introduction.md

View File

@@ -26,6 +26,10 @@ from prowler.lib.check.check import (
)
from prowler.lib.check.checks_loader import load_checks_to_execute
from prowler.lib.check.compliance import update_checks_metadata_with_compliance
from prowler.lib.check.custom_checks_metadata import (
parse_custom_checks_metadata_file,
update_checks_metadata,
)
from prowler.lib.cli.parser import ProwlerArgumentParser
from prowler.lib.logger import logger, set_logging_config
from prowler.lib.outputs.compliance import display_compliance_table
@@ -68,6 +72,7 @@ def prowler():
checks_folder = args.checks_folder
severities = args.severity
compliance_framework = args.compliance
custom_checks_metadata_file = args.custom_checks_metadata_file
if not args.no_banner:
print_banner(args)
@@ -97,9 +102,19 @@ def prowler():
bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider)
# Complete checks metadata with the compliance framework specification
update_checks_metadata_with_compliance(
bulk_checks_metadata = update_checks_metadata_with_compliance(
bulk_compliance_frameworks, bulk_checks_metadata
)
# Update checks metadata if the --custom-checks-metadata-file is present
custom_checks_metadata = None
if custom_checks_metadata_file:
custom_checks_metadata = parse_custom_checks_metadata_file(
provider, custom_checks_metadata_file
)
bulk_checks_metadata = update_checks_metadata(
bulk_checks_metadata, custom_checks_metadata
)
if args.list_compliance:
print_compliance_frameworks(bulk_compliance_frameworks)
sys.exit()
@@ -175,7 +190,11 @@ def prowler():
findings = []
if len(checks_to_execute):
findings = execute_checks(
checks_to_execute, provider, audit_info, audit_output_options
checks_to_execute,
provider,
audit_info,
audit_output_options,
custom_checks_metadata,
)
else:
logger.error(

View File

@@ -0,0 +1,15 @@
CustomChecksMetadata:
aws:
Checks:
s3_bucket_level_public_access_block:
Severity: high
s3_bucket_no_mfa_delete:
Severity: high
azure:
Checks:
storage_infrastructure_encryption_is_enabled:
Severity: medium
gcp:
Checks:
compute_instance_public_ip:
Severity: critical

View File

@@ -16,6 +16,7 @@ from colorama import Fore, Style
import prowler
from prowler.config.config import orange_color
from prowler.lib.check.compliance_models import load_compliance_framework
from prowler.lib.check.custom_checks_metadata import update_check_metadata
from prowler.lib.check.models import Check, load_check_metadata
from prowler.lib.logger import logger
from prowler.lib.outputs.outputs import report
@@ -416,6 +417,7 @@ def execute_checks(
provider: str,
audit_info: Any,
audit_output_options: Provider_Output_Options,
custom_checks_metadata: Any,
) -> list:
# List to store all the check's findings
all_findings = []
@@ -461,6 +463,7 @@ def execute_checks(
audit_info,
services_executed,
checks_executed,
custom_checks_metadata,
)
all_findings.extend(check_findings)
@@ -506,6 +509,7 @@ def execute_checks(
audit_info,
services_executed,
checks_executed,
custom_checks_metadata,
)
all_findings.extend(check_findings)
@@ -531,6 +535,7 @@ def execute(
audit_info: Any,
services_executed: set,
checks_executed: set,
custom_checks_metadata: Any,
):
# Import check module
check_module_path = (
@@ -541,6 +546,10 @@ def execute(
check_to_execute = getattr(lib, check_name)
c = check_to_execute()
# Update check metadata to reflect that in the outputs
if custom_checks_metadata and custom_checks_metadata["Checks"].get(c.CheckID):
c = update_check_metadata(c, custom_checks_metadata["Checks"][c.CheckID])
# Run check
check_findings = run_check(c, audit_output_options)

View File

@@ -0,0 +1,77 @@
import sys
import yaml
from jsonschema import validate
from prowler.lib.logger import logger
valid_severities = ["critical", "high", "medium", "low", "informational"]
custom_checks_metadata_schema = {
"type": "object",
"properties": {
"Checks": {
"type": "object",
"patternProperties": {
".*": {
"type": "object",
"properties": {
"Severity": {
"type": "string",
"enum": valid_severities,
}
},
"required": ["Severity"],
"additionalProperties": False,
}
},
"additionalProperties": False,
}
},
"required": ["Checks"],
"additionalProperties": False,
}
def parse_custom_checks_metadata_file(provider: str, parse_custom_checks_metadata_file):
"""parse_custom_checks_metadata_file returns the custom_checks_metadata object if it is valid, otherwise aborts the execution returning the ValidationError."""
try:
with open(parse_custom_checks_metadata_file) as f:
custom_checks_metadata = yaml.safe_load(f)["CustomChecksMetadata"][provider]
validate(custom_checks_metadata, schema=custom_checks_metadata_schema)
return custom_checks_metadata
except Exception as error:
logger.critical(
f"{error.__class__.__name__} -- {error}[{error.__traceback__.tb_lineno}]"
)
sys.exit(1)
def update_checks_metadata(bulk_checks_metadata, custom_checks_metadata):
"""update_checks_metadata returns the bulk_checks_metadata with the check's metadata updated based on the custom_checks_metadata provided."""
try:
# Update checks metadata from CustomChecksMetadata file
for check, custom_metadata in custom_checks_metadata["Checks"].items():
check_metadata = bulk_checks_metadata.get(check)
if check_metadata:
bulk_checks_metadata[check] = update_check_metadata(
check_metadata, custom_metadata
)
return bulk_checks_metadata
except Exception as error:
logger.critical(
f"{error.__class__.__name__} -- {error}[{error.__traceback__.tb_lineno}]"
)
sys.exit(1)
def update_check_metadata(check_metadata, custom_metadata):
"""update_check_metadata updates the check_metadata fields present in the custom_metadata and returns the updated version of the check_metadata. If some field is not present or valid the check_metadata is returned with the original fields."""
try:
if custom_metadata:
for attribute in custom_metadata:
try:
setattr(check_metadata, attribute, custom_metadata[attribute])
except ValueError:
pass
finally:
return check_metadata

View File

@@ -49,6 +49,7 @@ Detailed documentation at https://docs.prowler.cloud
self.__init_exclude_checks_parser__()
self.__init_list_checks_parser__()
self.__init_config_parser__()
self.__init_custom_checks_metadata_parser__()
# Init Providers Arguments
init_providers_parser(self)
@@ -286,3 +287,15 @@ Detailed documentation at https://docs.prowler.cloud
default=default_config_file_path,
help="Set configuration file path",
)
def __init_custom_checks_metadata_parser__(self):
# CustomChecksMetadata
custom_checks_metadata_subparser = (
self.common_providers_parser.add_argument_group("Custom Checks Metadata")
)
custom_checks_metadata_subparser.add_argument(
"--custom-checks-metadata-file",
nargs="?",
default=None,
help="Path for the custom checks metadata YAML file. See example prowler/config/custom_checks_metadata_example.yaml for reference and format. See more in https://docs.prowler.cloud/en/latest/tutorials/custom-checks-metadata/",
)

View File

@@ -126,6 +126,7 @@ def init_parser(self):
default=None,
help="Path for allowlist yaml file. See example prowler/config/aws_allowlist.yaml for reference and format. It also accepts AWS DynamoDB Table or Lambda ARNs or S3 URIs, see more in https://docs.prowler.cloud/en/latest/tutorials/allowlist/",
)
# Based Scans
aws_based_scans_subparser = aws_parser.add_argument_group("AWS Based Scans")
aws_based_scans_parser = aws_based_scans_subparser.add_mutually_exclusive_group()

View File

@@ -0,0 +1,164 @@
import logging
import os
import pytest
from prowler.lib.check.custom_checks_metadata import (
parse_custom_checks_metadata_file,
update_check_metadata,
update_checks_metadata,
)
from prowler.lib.check.models import (
Check_Metadata_Model,
Code,
Recommendation,
Remediation,
)
CUSTOM_CHECKS_METADATA_FIXTURE_FILE = f"{os.path.dirname(os.path.realpath(__file__))}/fixtures/custom_checks_metadata_example.yaml"
CUSTOM_CHECKS_METADATA_FIXTURE_FILE_NOT_VALID = f"{os.path.dirname(os.path.realpath(__file__))}/fixtures/custom_checks_metadata_example_not_valid.yaml"
AWS_PROVIDER = "aws"
AZURE_PROVIDER = "azure"
GCP_PROVIDER = "gcp"
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME = "s3_bucket_level_public_access_block"
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY = "medium"
class TestCustomChecksMetadata:
def get_custom_check_metadata(self):
return Check_Metadata_Model(
Provider="aws",
CheckID=S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME,
CheckTitle="Check S3 Bucket Level Public Access Block.",
CheckType=["Data Protection"],
CheckAliases=[],
ServiceName="s3",
SubServiceName="",
ResourceIdTemplate="arn:partition:s3:::bucket_name",
Severity=S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY,
ResourceType="AwsS3Bucket",
Description="Check S3 Bucket Level Public Access Block.",
Risk="Public access policies may be applied to sensitive data buckets.",
RelatedUrl="https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html",
Remediation=Remediation(
Code=Code(
NativeIaC="",
Terraform="https://docs.bridgecrew.io/docs/bc_aws_s3_20#terraform",
CLI="aws s3api put-public-access-block --region <REGION_NAME> --public-access-block-configuration BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true --bucket <BUCKET_NAME>",
Other="https://github.com/cloudmatos/matos/tree/master/remediations/aws/s3/s3/block-public-access",
),
Recommendation=Recommendation(
Text="You can enable Public Access Block at the bucket level to prevent the exposure of your data stored in S3.",
Url="https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html",
),
),
Categories=[],
DependsOn=[],
RelatedTo=[],
Notes="",
Compliance=[],
)
def test_parse_custom_checks_metadata_file_for_aws(self):
assert parse_custom_checks_metadata_file(
AWS_PROVIDER, CUSTOM_CHECKS_METADATA_FIXTURE_FILE
) == {
"Checks": {
"s3_bucket_level_public_access_block": {"Severity": "high"},
"s3_bucket_no_mfa_delete": {"Severity": "high"},
}
}
def test_parse_custom_checks_metadata_file_for_azure(self):
assert parse_custom_checks_metadata_file(
AZURE_PROVIDER, CUSTOM_CHECKS_METADATA_FIXTURE_FILE
) == {"Checks": {"sqlserver_auditing_enabled": {"Severity": "high"}}}
def test_parse_custom_checks_metadata_file_for_gcp(self):
assert parse_custom_checks_metadata_file(
GCP_PROVIDER, CUSTOM_CHECKS_METADATA_FIXTURE_FILE
) == {"Checks": {"bigquery_dataset_cmk_encryption": {"Severity": "low"}}}
def test_parse_custom_checks_metadata_file_for_aws_validation_error(self, caplog):
caplog.set_level(logging.CRITICAL)
with pytest.raises(SystemExit) as error:
parse_custom_checks_metadata_file(
AWS_PROVIDER, CUSTOM_CHECKS_METADATA_FIXTURE_FILE_NOT_VALID
)
assert error.type == SystemExit
assert error.value.code == 1
assert "'Checks' is a required property" in caplog.text
def test_update_checks_metadata(self):
updated_severity = "high"
bulk_checks_metadata = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata(),
}
custom_checks_metadata = {
"Checks": {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: {
"Severity": updated_severity
},
}
}
bulk_checks_metadata_updated = update_checks_metadata(
bulk_checks_metadata, custom_checks_metadata
).get(S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME)
assert bulk_checks_metadata_updated.Severity == updated_severity
def test_update_checks_metadata_not_present_field(self):
bulk_checks_metadata = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata(),
}
custom_checks_metadata = {
"Checks": {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: {
"RandomField": "random_value"
},
}
}
bulk_checks_metadata_updated = update_checks_metadata(
bulk_checks_metadata, custom_checks_metadata
).get(S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME)
assert (
bulk_checks_metadata_updated.Severity
== S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY
)
def test_update_check_metadata(self):
updated_severity = "high"
custom_checks_metadata = {"Severity": updated_severity}
check_metadata_updated = update_check_metadata(
self.get_custom_check_metadata(), custom_checks_metadata
)
assert check_metadata_updated.Severity == updated_severity
def test_update_check_metadata_not_present_field(self):
custom_checks_metadata = {"RandomField": "random_value"}
check_metadata_updated = update_check_metadata(
self.get_custom_check_metadata(), custom_checks_metadata
)
assert (
check_metadata_updated.Severity
== S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY
)
def test_update_check_metadata_none_custom_metadata(self):
custom_checks_metadata = None
check_metadata_updated = update_check_metadata(
self.get_custom_check_metadata(), custom_checks_metadata
)
assert (
check_metadata_updated.Severity
== S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY
)

View File

@@ -0,0 +1,15 @@
CustomChecksMetadata:
aws:
Checks:
s3_bucket_level_public_access_block:
Severity: high
s3_bucket_no_mfa_delete:
Severity: high
azure:
Checks:
sqlserver_auditing_enabled:
Severity: high
gcp:
Checks:
bigquery_dataset_cmk_encryption:
Severity: low

View File

@@ -0,0 +1,5 @@
CustomChecksMetadata:
aws:
Check:
s3_bucket_level_public_access_block:
Severity: high