feat(custom checks): add -x/--checks-folder for custom checks (#2191)

This commit is contained in:
Sergio Garcia
2023-04-13 13:44:25 +02:00
committed by GitHub
parent 25630f1ef5
commit 4da6d152c3
12 changed files with 244 additions and 42 deletions

View File

@@ -51,7 +51,21 @@ prowler <provider> -e/--excluded-checks ec2 rds
```console
prowler <provider> -C/--checks-file <checks_list>.json
```
## Custom Checks
Prowler allows you to include your custom checks with the flag:
```console
prowler <provider> -x/--checks-folder <custom_checks_folder>
```
> S3 URIs are also supported as folders for custom checks, e.g. s3://bucket/prefix/checks_folder/. Make sure that the used credentials have s3:GetObject permissions in the S3 path where the custom checks are located.
The custom checks folder must contain one subfolder per check, each subfolder must be named as the check and must contain:
- An empty `__init__.py`: to make Python treat this check folder as a package.
- A `check_name.py` containing the check's logic.
- A `check_name.metadata.json` containing the check's metadata.
>The check name must start with the service name followed by an underscore (e.g., ec2_instance_public_ip).
To see more information about how to write checks see the [Developer Guide](../developer-guide/#create-a-new-check-for-a-provider).
## Severities
Each of Prowler's checks has a severity, which can be:
- informational

View File

@@ -12,11 +12,13 @@ from prowler.lib.check.check import (
execute_checks,
list_categories,
list_services,
parse_checks_from_folder,
print_categories,
print_checks,
print_compliance_frameworks,
print_compliance_requirements,
print_services,
remove_custom_checks_module,
)
from prowler.lib.check.checks_loader import load_checks_to_execute
from prowler.lib.check.compliance import update_checks_metadata_with_compliance
@@ -52,9 +54,17 @@ def prowler():
services = args.services
categories = args.categories
checks_file = args.checks_file
checks_folder = args.checks_folder
severities = args.severity
compliance_framework = args.compliance
# Set the audit info based on the selected provider
audit_info = set_provider_audit_info(provider, args.__dict__)
# Import custom checks from folder
if checks_folder:
parse_checks_from_folder(audit_info, checks_folder, provider)
# We treat the compliance framework as another output format
if compliance_framework:
args.output_modes.extend(compliance_framework)
@@ -126,9 +136,6 @@ def prowler():
print_checks(provider, checks_to_execute, bulk_checks_metadata)
sys.exit()
# Set the audit info based on the selected provider
audit_info = set_provider_audit_info(provider, args.__dict__)
# Once the audit_info is set and we have the eventual checks based on the resource identifier,
# it is time to check what Prowler's checks are going to be executed
if audit_info.audit_resources:
@@ -217,6 +224,10 @@ def prowler():
audit_output_options.output_directory,
)
# If custom checks were passed, remove the modules
if checks_folder:
remove_custom_checks_module(checks_folder, provider)
# If there are failed findings exit code 3, except if -z is input
if not args.ignore_exit_code_3 and stats["total_fail"] > 0:
sys.exit(3)

View File

@@ -1,6 +1,8 @@
import functools
import importlib
import os
import re
import shutil
import sys
import traceback
from pkgutil import walk_packages
@@ -24,6 +26,7 @@ except KeyError:
except Exception:
sys.exit(1)
import prowler
from prowler.lib.utils.utils import open_file, parse_json_file
from prowler.providers.common.models import Audit_Metadata
from prowler.providers.common.outputs import Provider_Output_Options
@@ -117,6 +120,66 @@ def parse_checks_from_file(input_file: str, provider: str) -> set:
return checks_to_execute
# Load checks from custom folder
def parse_checks_from_folder(audit_info, input_folder: str, provider: str) -> int:
try:
imported_checks = 0
# Check if input folder is a S3 URI
if provider == "aws" and re.search(
"^s3://([^/]+)/(.*?([^/]+))/$", input_folder
):
bucket = input_folder.split("/")[2]
key = ("/").join(input_folder.split("/")[3:])
s3_reource = audit_info.audit_session.resource("s3")
bucket = s3_reource.Bucket(bucket)
for obj in bucket.objects.filter(Prefix=key):
if not os.path.exists(os.path.dirname(obj.key)):
os.makedirs(os.path.dirname(obj.key))
bucket.download_file(obj.key, obj.key)
input_folder = key
# Import custom checks by moving the checks folders to the corresponding services
with os.scandir(input_folder) as checks:
for check in checks:
if check.is_dir():
check_module = input_folder + "/" + check.name
# Copy checks to specific provider/service folder
check_service = check.name.split("_")[0]
prowler_dir = prowler.__path__
prowler_module = f"{prowler_dir[0]}/providers/{provider}/services/{check_service}/{check.name}"
if os.path.exists(prowler_module):
shutil.rmtree(prowler_module)
shutil.copytree(check_module, prowler_module)
imported_checks += 1
return imported_checks
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit(1)
# Load checks from custom folder
def remove_custom_checks_module(input_folder: str, provider: str):
# Check if input folder is a S3 URI
s3_uri = False
if provider == "aws" and re.search("^s3://([^/]+)/(.*?([^/]+))/$", input_folder):
input_folder = ("/").join(input_folder.split("/")[3:])
s3_uri = True
with os.scandir(input_folder) as checks:
for check in checks:
if check.is_dir():
# Remove imported checks
check_service = check.name.split("_")[0]
prowler_dir = prowler.__path__
prowler_module = f"{prowler_dir[0]}/providers/{provider}/services/{check_service}/{check.name}"
if os.path.exists(prowler_module):
shutil.rmtree(prowler_module)
# If S3 URI, remove the downloaded folders
if s3_uri and os.path.exists(input_folder):
shutil.rmtree(input_folder)
def list_services(provider: str) -> set():
available_services = set()
checks_tuple = recover_checks_from_provider(provider)

View File

@@ -226,6 +226,12 @@ Detailed documentation at https://docs.prowler.cloud
default=[],
# Pending validate choices
)
common_checks_parser.add_argument(
"-x",
"--checks-folder",
nargs="?",
help="Specify external directory with custom checks (each check must have a folder with the required files, see more in https://docs.prowler.cloud/en/latest/tutorials/misc/#custom-checks).",
)
def __init_list_checks_parser__(self):
# List checks options

View File

@@ -211,31 +211,37 @@ def add_html_header(file_descriptor, audit_info):
def fill_html(file_descriptor, finding, output_options):
row_class = "p-3 mb-2 bg-success-custom"
if finding.status == "INFO":
row_class = "table-info"
elif finding.status == "FAIL":
row_class = "table-danger"
elif finding.status == "WARNING":
row_class = "table-warning"
file_descriptor.write(
f"""
<tr class="{row_class}">
<td>{finding.status}</td>
<td>{finding.check_metadata.Severity}</td>
<td>{finding.check_metadata.ServiceName}</td>
<td>{finding.region}</td>
<td>{finding.check_metadata.CheckID.replace("_", "<wbr>_")}</td>
<td>{finding.check_metadata.CheckTitle}</td>
<td>{finding.resource_id.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr>_")}</td>
<td>{parse_html_string(unroll_tags(finding.resource_tags))}</td>
<td>{finding.status_extended.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr>_")}</td>
<td><p class="show-read-more">{finding.check_metadata.Risk}</p></td>
<td><p class="show-read-more">{finding.check_metadata.Remediation.Recommendation.Text}</p> <a class="read-more" href="{finding.check_metadata.Remediation.Recommendation.Url}"><i class="fas fa-external-link-alt"></i></a></td>
<td><p class="show-read-more">{parse_html_string(unroll_dict(get_check_compliance(finding, finding.check_metadata.Provider, output_options)))}</p></td>
</tr>
"""
)
try:
row_class = "p-3 mb-2 bg-success-custom"
if finding.status == "INFO":
row_class = "table-info"
elif finding.status == "FAIL":
row_class = "table-danger"
elif finding.status == "WARNING":
row_class = "table-warning"
file_descriptor.write(
f"""
<tr class="{row_class}">
<td>{finding.status}</td>
<td>{finding.check_metadata.Severity}</td>
<td>{finding.check_metadata.ServiceName}</td>
<td>{finding.region}</td>
<td>{finding.check_metadata.CheckID.replace("_", "<wbr>_")}</td>
<td>{finding.check_metadata.CheckTitle}</td>
<td>{finding.resource_id.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr>_")}</td>
<td>{parse_html_string(unroll_tags(finding.resource_tags))}</td>
<td>{finding.status_extended.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr>_")}</td>
<td><p class="show-read-more">{finding.check_metadata.Risk}</p></td>
<td><p class="show-read-more">{finding.check_metadata.Remediation.Recommendation.Text}</p> <a class="read-more" href="{finding.check_metadata.Remediation.Recommendation.Url}"><i class="fas fa-external-link-alt"></i></a></td>
<td><p class="show-read-more">{parse_html_string(unroll_dict(get_check_compliance(finding, finding.check_metadata.Provider, output_options)))}</p></td>
</tr>
"""
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit(1)
def fill_html_overview_statistics(stats, output_filename, output_directory):

View File

@@ -12,20 +12,27 @@ from prowler.providers.aws.lib.audit_info.models import AWS_Organizations_Info
def get_check_compliance(finding, provider, output_options):
check_compliance = {}
# We have to retrieve all the check's compliance requirements
for compliance in output_options.bulk_checks_metadata[
finding.check_metadata.CheckID
].Compliance:
compliance_fw = compliance.Framework
if compliance.Version:
compliance_fw = f"{compliance_fw}-{compliance.Version}"
if compliance.Provider == provider.upper():
if compliance_fw not in check_compliance:
check_compliance[compliance_fw] = []
for requirement in compliance.Requirements:
check_compliance[compliance_fw].append(requirement.Id)
return check_compliance
try:
check_compliance = {}
# We have to retrieve all the check's compliance requirements
if finding.check_metadata.CheckID in output_options.bulk_checks_metadata:
for compliance in output_options.bulk_checks_metadata[
finding.check_metadata.CheckID
].Compliance:
compliance_fw = compliance.Framework
if compliance.Version:
compliance_fw = f"{compliance_fw}-{compliance.Version}"
if compliance.Provider == provider.upper():
if compliance_fw not in check_compliance:
check_compliance[compliance_fw] = []
for requirement in compliance.Requirements:
check_compliance[compliance_fw].append(requirement.Id)
return check_compliance
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit(1)
def generate_provider_output_csv(

View File

@@ -3,7 +3,9 @@ import pathlib
from importlib.machinery import FileFinder
from pkgutil import ModuleInfo
from boto3 import client, session
from mock import patch
from moto import mock_s3
from prowler.lib.check.check import (
exclude_checks_to_run,
@@ -11,8 +13,10 @@ from prowler.lib.check.check import (
list_modules,
list_services,
parse_checks_from_file,
parse_checks_from_folder,
recover_checks_from_provider,
recover_checks_from_service,
remove_custom_checks_module,
update_audit_metadata,
)
from prowler.lib.check.models import load_check_metadata
@@ -20,6 +24,10 @@ from prowler.providers.aws.aws_provider import (
get_checks_from_input_arn,
get_regions_from_audit_resources,
)
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
expected_packages = [
ModuleInfo(
@@ -127,6 +135,28 @@ def mock_recover_checks_from_aws_provider_lambda_service(*_):
class Test_Check:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
)
return audit_info
def test_load_check_metadata(self):
test_cases = [
{
@@ -164,6 +194,50 @@ class Test_Check:
provider = test["input"]["provider"]
assert parse_checks_from_file(check_file, provider) == test["expected"]
@mock_s3
def test_parse_checks_from_folder(self):
test_checks_folder = (
f"{pathlib.Path().absolute()}/tests/lib/check/fixtures/checks_folder"
)
# Create bucket and upload checks folder
s3_client = client("s3", region_name=AWS_REGION)
s3_client.create_bucket(Bucket="test")
# Iterate through the files in the folder and upload each one
for subdir, _, files in os.walk(test_checks_folder):
for file in files:
check = subdir.split("/")[-1]
full_path = os.path.join(subdir, file)
with open(full_path, "rb") as data:
s3_client.upload_fileobj(
data, "test", f"checks_folder/{check}/{file}"
)
test_cases = [
{
"input": {
"path": test_checks_folder,
"provider": "aws",
},
"expected": 3,
},
{
"input": {
"path": "s3://test/checks_folder/",
"provider": "aws",
},
"expected": 3,
},
]
for test in test_cases:
check_folder = test["input"]["path"]
provider = test["input"]["provider"]
assert (
parse_checks_from_folder(
self.set_mocked_audit_info(), check_folder, provider
)
== test["expected"]
)
remove_custom_checks_module(check_folder, provider)
def test_exclude_checks_to_run(self):
test_cases = [
{

View File

@@ -31,6 +31,7 @@ class Test_Parser:
assert not parsed.only_logs
assert not parsed.checks
assert not parsed.checks_file
assert not parsed.checks_folder
assert not parsed.services
assert not parsed.severity
assert not parsed.compliance
@@ -75,6 +76,7 @@ class Test_Parser:
assert not parsed.only_logs
assert not parsed.checks
assert not parsed.checks_file
assert not parsed.checks_folder
assert not parsed.services
assert not parsed.severity
assert not parsed.compliance
@@ -111,6 +113,7 @@ class Test_Parser:
assert not parsed.only_logs
assert not parsed.checks
assert not parsed.checks_file
assert not parsed.checks_folder
assert not parsed.services
assert not parsed.severity
assert not parsed.compliance
@@ -417,6 +420,20 @@ class Test_Parser:
parsed = self.parser.parse(command)
assert parsed.checks_file == filename
def test_checks_parser_checks_folder_short(self):
argument = "-x"
filename = "custom-checks-folder/"
command = [prowler_command, argument, filename]
parsed = self.parser.parse(command)
assert parsed.checks_folder == filename
def test_checks_parser_checks_folder_long(self):
argument = "--checks-folder"
filename = "custom-checks-folder/"
command = [prowler_command, argument, filename]
parsed = self.parser.parse(command)
assert parsed.checks_folder == filename
def test_checks_parser_services_short(self):
argument = "-s"
service_1 = "iam"

View File

@@ -807,6 +807,10 @@ class Test_Outputs:
finding.status_extended = "This is a test"
output_options = mock.MagicMock()
output_options.bulk_checks_metadata = {}
output_options.bulk_checks_metadata[
"iam_disable_30_days_credentials"
] = mock.MagicMock()
output_options.bulk_checks_metadata[
"iam_disable_30_days_credentials"
].Compliance = bulk_check_metadata