refactor(load_checks_to_execute): Refactor function and add tests (#3066)

This commit is contained in:
Pepe Fagoaga
2023-11-30 17:41:14 +01:00
committed by GitHub
parent de4166bf0d
commit 6d2b2a9a93
4 changed files with 436 additions and 107 deletions

View File

@@ -107,6 +107,8 @@ def exclude_services_to_run(
# Load checks from checklist.json
def parse_checks_from_file(input_file: str, provider: str) -> set:
"""parse_checks_from_file returns a set of checks read from the given file"""
try:
checks_to_execute = set()
with open_file(input_file) as f:
json_file = parse_json_file(f)
@@ -115,6 +117,10 @@ def parse_checks_from_file(input_file: str, provider: str) -> set:
checks_to_execute.add(check_name)
return checks_to_execute
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
# Load checks from custom folder
@@ -310,7 +316,7 @@ def print_checks(
def parse_checks_from_compliance_framework(
compliance_frameworks: list, bulk_compliance_frameworks: dict
) -> list:
"""Parse checks from compliance frameworks specification"""
"""parse_checks_from_compliance_framework returns a set of checks from the given compliance_frameworks"""
checks_to_execute = set()
try:
for framework in compliance_frameworks:
@@ -607,22 +613,32 @@ def update_audit_metadata(
)
def recover_checks_from_service(service_list: list, provider: str) -> list:
def recover_checks_from_service(service_list: list, provider: str) -> set:
"""
Recover all checks from the selected provider and service
Returns a set of checks from the given services
"""
try:
checks = set()
service_list = [
"awslambda" if service == "lambda" else service for service in service_list
]
for service in service_list:
modules = recover_checks_from_provider(provider, service)
if not modules:
service_checks = recover_checks_from_provider(provider, service)
if not service_checks:
logger.error(f"Service '{service}' does not have checks.")
else:
for check_module in modules:
for check in service_checks:
# Recover check name and module name from import path
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = check_module[0].split(".")[-1]
check_name = check[0].split(".")[-1]
# If the service is present in the group list passed as parameters
# if service_name in group_list: checks_from_arn.add(check_name)
checks.add(check_name)
return checks
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

View File

@@ -22,8 +22,35 @@ def load_checks_to_execute(
categories: set,
provider: str,
) -> set:
"""Generate the list of checks to execute based on the cloud provider and input arguments specified"""
"""Generate the list of checks to execute based on the cloud provider and the input arguments given"""
try:
# Local subsets
checks_to_execute = set()
check_aliases = {}
check_severities = {
"critical": [],
"high": [],
"medium": [],
"low": [],
"informational": [],
}
check_categories = {}
# First, loop over the bulk_checks_metadata to extract the needed subsets
for check, metadata in bulk_checks_metadata.items():
# Aliases
for alias in metadata.CheckAliases:
check_aliases[alias] = check
# Severities
if metadata.Severity:
check_severities[metadata.Severity].append(check)
# Categories
for category in metadata.Categories:
if category not in check_categories:
check_categories[category] = []
check_categories[category].append(check)
# Handle if there are checks passed using -c/--checks
if check_list:
@@ -32,21 +59,18 @@ def load_checks_to_execute(
# Handle if there are some severities passed using --severity
elif severities:
for check in bulk_checks_metadata:
# Check check's severity
if bulk_checks_metadata[check].Severity in severities:
checks_to_execute.add(check)
for severity in severities:
checks_to_execute.update(check_severities[severity])
if service_list:
checks_to_execute = (
recover_checks_from_service(service_list, provider) & checks_to_execute
recover_checks_from_service(service_list, provider)
& checks_to_execute
)
# Handle if there are checks passed using -C/--checks-file
elif checks_file:
try:
checks_to_execute = parse_checks_from_file(checks_file, provider)
except Exception as e:
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
# Handle if there are services passed using -s/--services
elif service_list:
@@ -54,41 +78,43 @@ def load_checks_to_execute(
# Handle if there are compliance frameworks passed using --compliance
elif compliance_frameworks:
try:
checks_to_execute = parse_checks_from_compliance_framework(
compliance_frameworks, bulk_compliance_frameworks
)
except Exception as e:
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
# Handle if there are categories passed using --categories
elif categories:
for cat in categories:
for check in bulk_checks_metadata:
# Check check's categories
if cat in bulk_checks_metadata[check].Categories:
checks_to_execute.add(check)
for category in categories:
checks_to_execute.update(check_categories[category])
# If there are no checks passed as argument
else:
try:
# Get all check modules to run with the specific provider
checks = recover_checks_from_provider(provider)
except Exception as e:
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
else:
for check_info in checks:
# Recover check name from import path (last part)
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = check_info[0]
checks_to_execute.add(check_name)
# Get Check Aliases mapping
check_aliases = {}
for check, metadata in bulk_checks_metadata.items():
for alias in metadata.CheckAliases:
check_aliases[alias] = check
# Check Aliases
checks_to_execute = update_checks_to_execute_with_aliases(
checks_to_execute, check_aliases
)
return checks_to_execute
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
def update_checks_to_execute_with_aliases(
checks_to_execute: set, check_aliases: dict
) -> set:
"""update_checks_to_execute_with_aliases returns the checks_to_execute updated using the check aliases."""
# Verify if any input check is an alias of another check
for input_check in checks_to_execute:
if (
@@ -101,5 +127,4 @@ def load_checks_to_execute(
print(
f"\nUsing alias {Fore.YELLOW}{input_check}{Style.RESET_ALL} for check {Fore.YELLOW}{check_aliases[input_check]}{Style.RESET_ALL}...\n"
)
return checks_to_execute

View File

@@ -0,0 +1,319 @@
from mock import patch
from prowler.lib.check.checks_loader import (
load_checks_to_execute,
update_checks_to_execute_with_aliases,
)
from prowler.lib.check.models import (
Check_Metadata_Model,
Code,
Recommendation,
Remediation,
)
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME = "s3_bucket_level_public_access_block"
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_CUSTOM_ALIAS = (
"s3_bucket_level_public_access_block"
)
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY = "medium"
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_SERVICE = "s3"
class TestCheckLoader:
provider = "aws"
def get_custom_check_metadata(self):
return Check_Metadata_Model(
Provider="aws",
CheckID=S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME,
CheckTitle="Check S3 Bucket Level Public Access Block.",
CheckType=["Data Protection"],
CheckAliases=[S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_CUSTOM_ALIAS],
ServiceName=S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_SERVICE,
SubServiceName="",
ResourceIdTemplate="arn:partition:s3:::bucket_name",
Severity=S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY,
ResourceType="AwsS3Bucket",
Description="Check S3 Bucket Level Public Access Block.",
Risk="Public access policies may be applied to sensitive data buckets.",
RelatedUrl="https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html",
Remediation=Remediation(
Code=Code(
NativeIaC="",
Terraform="https://docs.bridgecrew.io/docs/bc_aws_s3_20#terraform",
CLI="aws s3api put-public-access-block --region <REGION_NAME> --public-access-block-configuration BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true --bucket <BUCKET_NAME>",
Other="https://github.com/cloudmatos/matos/tree/master/remediations/aws/s3/s3/block-public-access",
),
Recommendation=Recommendation(
Text="You can enable Public Access Block at the bucket level to prevent the exposure of your data stored in S3.",
Url="https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html",
),
),
Categories=["internet-exposed"],
DependsOn=[],
RelatedTo=[],
Notes="",
Compliance=[],
)
def test_load_checks_to_execute(self):
bulk_checks_metatada = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata()
}
bulk_compliance_frameworks = None
checks_file = None
check_list = None
service_list = None
severities = None
compliance_frameworks = None
categories = None
with patch(
"prowler.lib.check.checks_loader.recover_checks_from_provider",
return_value=[
(
f"{S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME}",
"path/to/{S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME}",
)
],
):
assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute(
bulk_checks_metatada,
bulk_compliance_frameworks,
checks_file,
check_list,
service_list,
severities,
compliance_frameworks,
categories,
self.provider,
)
def test_load_checks_to_execute_with_check_list(self):
bulk_checks_metatada = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata()
}
bulk_compliance_frameworks = None
checks_file = None
check_list = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME]
service_list = None
severities = None
compliance_frameworks = None
categories = None
assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute(
bulk_checks_metatada,
bulk_compliance_frameworks,
checks_file,
check_list,
service_list,
severities,
compliance_frameworks,
categories,
self.provider,
)
def test_load_checks_to_execute_with_severities(self):
bulk_checks_metatada = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata()
}
bulk_compliance_frameworks = None
checks_file = None
check_list = []
service_list = None
severities = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY]
compliance_frameworks = None
categories = None
assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute(
bulk_checks_metatada,
bulk_compliance_frameworks,
checks_file,
check_list,
service_list,
severities,
compliance_frameworks,
categories,
self.provider,
)
def test_load_checks_to_execute_with_severities_and_services(self):
bulk_checks_metatada = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata()
}
bulk_compliance_frameworks = None
checks_file = None
check_list = []
service_list = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_SERVICE]
severities = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY]
compliance_frameworks = None
categories = None
with patch(
"prowler.lib.check.checks_loader.recover_checks_from_service",
return_value={S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME},
):
assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute(
bulk_checks_metatada,
bulk_compliance_frameworks,
checks_file,
check_list,
service_list,
severities,
compliance_frameworks,
categories,
self.provider,
)
def test_load_checks_to_execute_with_severities_and_services_not_within_severity(
self,
):
bulk_checks_metatada = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata()
}
bulk_compliance_frameworks = None
checks_file = None
check_list = []
service_list = ["ec2"]
severities = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY]
compliance_frameworks = None
categories = None
with patch(
"prowler.lib.check.checks_loader.recover_checks_from_service",
return_value={"ec2_ami_public"},
):
assert set() == load_checks_to_execute(
bulk_checks_metatada,
bulk_compliance_frameworks,
checks_file,
check_list,
service_list,
severities,
compliance_frameworks,
categories,
self.provider,
)
def test_load_checks_to_execute_with_checks_file(
self,
):
bulk_checks_metatada = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata()
}
bulk_compliance_frameworks = None
checks_file = "path/to/test_file"
check_list = []
service_list = []
severities = []
compliance_frameworks = None
categories = None
with patch(
"prowler.lib.check.checks_loader.parse_checks_from_file",
return_value={S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME},
):
assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute(
bulk_checks_metatada,
bulk_compliance_frameworks,
checks_file,
check_list,
service_list,
severities,
compliance_frameworks,
categories,
self.provider,
)
def test_load_checks_to_execute_with_service_list(
self,
):
bulk_checks_metatada = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata()
}
bulk_compliance_frameworks = None
checks_file = None
check_list = []
service_list = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_SERVICE]
severities = []
compliance_frameworks = None
categories = None
with patch(
"prowler.lib.check.checks_loader.recover_checks_from_service",
return_value={S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME},
):
assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute(
bulk_checks_metatada,
bulk_compliance_frameworks,
checks_file,
check_list,
service_list,
severities,
compliance_frameworks,
categories,
self.provider,
)
def test_load_checks_to_execute_with_compliance_frameworks(
self,
):
bulk_checks_metatada = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata()
}
bulk_compliance_frameworks = None
checks_file = None
check_list = []
service_list = []
severities = []
compliance_frameworks = ["test-compliance-framework"]
categories = None
with patch(
"prowler.lib.check.checks_loader.parse_checks_from_compliance_framework",
return_value={S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME},
):
assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute(
bulk_checks_metatada,
bulk_compliance_frameworks,
checks_file,
check_list,
service_list,
severities,
compliance_frameworks,
categories,
self.provider,
)
def test_load_checks_to_execute_with_categories(
self,
):
bulk_checks_metatada = {
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata()
}
bulk_compliance_frameworks = None
checks_file = None
check_list = []
service_list = []
severities = []
compliance_frameworks = []
categories = {"internet-exposed"}
assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute(
bulk_checks_metatada,
bulk_compliance_frameworks,
checks_file,
check_list,
service_list,
severities,
compliance_frameworks,
categories,
self.provider,
)
def test_update_checks_to_execute_with_aliases(self):
checks_to_execute = {"renamed_check"}
check_aliases = {"renamed_check": "check_name"}
assert {"check_name"} == update_checks_to_execute_with_aliases(
checks_to_execute, check_aliases
)

View File

@@ -3,7 +3,7 @@ import pathlib
from importlib.machinery import FileFinder
from pkgutil import ModuleInfo
from boto3 import client, session
from boto3 import client
from fixtures.bulk_checks_metadata import test_bulk_checks_metadata
from mock import patch
from moto import mock_s3
@@ -27,8 +27,7 @@ from prowler.providers.aws.aws_provider import (
get_checks_from_input_arn,
get_regions_from_audit_resources,
)
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.common.models import Audit_Metadata
from tests.providers.aws.audit_info_utils import set_mocked_aws_audit_info
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
@@ -258,36 +257,6 @@ def mock_recover_checks_from_aws_provider_rds_service(*_):
class Test_Check:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root",
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
def test_load_check_metadata(self):
test_cases = [
{
@@ -363,7 +332,7 @@ class Test_Check:
provider = test["input"]["provider"]
assert (
parse_checks_from_folder(
self.set_mocked_audit_info(), check_folder, provider
set_mocked_aws_audit_info(), check_folder, provider
)
== test["expected"]
)