diff --git a/prowler/lib/check/check.py b/prowler/lib/check/check.py index ce5e23cf..5f24e87b 100644 --- a/prowler/lib/check/check.py +++ b/prowler/lib/check/check.py @@ -107,14 +107,20 @@ def exclude_services_to_run( # Load checks from checklist.json def parse_checks_from_file(input_file: str, provider: str) -> set: - checks_to_execute = set() - with open_file(input_file) as f: - json_file = parse_json_file(f) + """parse_checks_from_file returns a set of checks read from the given file""" + try: + checks_to_execute = set() + with open_file(input_file) as f: + json_file = parse_json_file(f) - for check_name in json_file[provider]: - checks_to_execute.add(check_name) + for check_name in json_file[provider]: + checks_to_execute.add(check_name) - return checks_to_execute + return checks_to_execute + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}" + ) # Load checks from custom folder @@ -310,7 +316,7 @@ def print_checks( def parse_checks_from_compliance_framework( compliance_frameworks: list, bulk_compliance_frameworks: dict ) -> list: - """Parse checks from compliance frameworks specification""" + """parse_checks_from_compliance_framework returns a set of checks from the given compliance_frameworks""" checks_to_execute = set() try: for framework in compliance_frameworks: @@ -607,22 +613,32 @@ def update_audit_metadata( ) -def recover_checks_from_service(service_list: list, provider: str) -> list: - checks = set() - service_list = [ - "awslambda" if service == "lambda" else service for service in service_list - ] - for service in service_list: - modules = recover_checks_from_provider(provider, service) - if not modules: - logger.error(f"Service '{service}' does not have checks.") +def recover_checks_from_service(service_list: list, provider: str) -> set: + """ + Recover all checks from the selected provider and service - else: - for check_module in modules: - # Recover check name and module name from import path - # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" - check_name = check_module[0].split(".")[-1] - # If the service is present in the group list passed as parameters - # if service_name in group_list: checks_from_arn.add(check_name) - checks.add(check_name) - return checks + Returns a set of checks from the given services + """ + try: + checks = set() + service_list = [ + "awslambda" if service == "lambda" else service for service in service_list + ] + for service in service_list: + service_checks = recover_checks_from_provider(provider, service) + if not service_checks: + logger.error(f"Service '{service}' does not have checks.") + + else: + for check in service_checks: + # Recover check name and module name from import path + # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" + check_name = check[0].split(".")[-1] + # If the service is present in the group list passed as parameters + # if service_name in group_list: checks_from_arn.add(check_name) + checks.add(check_name) + return checks + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) diff --git a/prowler/lib/check/checks_loader.py b/prowler/lib/check/checks_loader.py index c82a601e..f1e8156e 100644 --- a/prowler/lib/check/checks_loader.py +++ b/prowler/lib/check/checks_loader.py @@ -22,73 +22,99 @@ def load_checks_to_execute( categories: set, provider: str, ) -> set: - """Generate the list of checks to execute based on the cloud provider and input arguments specified""" - checks_to_execute = set() + """Generate the list of checks to execute based on the cloud provider and the input arguments given""" + try: + # Local subsets + checks_to_execute = set() + check_aliases = {} + check_severities = { + "critical": [], + "high": [], + "medium": [], + "low": [], + "informational": [], + } + check_categories = {} - # Handle if there are checks passed using -c/--checks - if check_list: - for check_name in check_list: - checks_to_execute.add(check_name) + # First, loop over the bulk_checks_metadata to extract the needed subsets + for check, metadata in bulk_checks_metadata.items(): + # Aliases + for alias in metadata.CheckAliases: + check_aliases[alias] = check - # Handle if there are some severities passed using --severity - elif severities: - for check in bulk_checks_metadata: - # Check check's severity - if bulk_checks_metadata[check].Severity in severities: - checks_to_execute.add(check) - if service_list: - checks_to_execute = ( - recover_checks_from_service(service_list, provider) & checks_to_execute - ) + # Severities + if metadata.Severity: + check_severities[metadata.Severity].append(check) - # Handle if there are checks passed using -C/--checks-file - elif checks_file: - try: + # Categories + for category in metadata.Categories: + if category not in check_categories: + check_categories[category] = [] + check_categories[category].append(check) + + # Handle if there are checks passed using -c/--checks + if check_list: + for check_name in check_list: + checks_to_execute.add(check_name) + + # Handle if there are some severities passed using --severity + elif severities: + for severity in severities: + checks_to_execute.update(check_severities[severity]) + + if service_list: + checks_to_execute = ( + recover_checks_from_service(service_list, provider) + & checks_to_execute + ) + + # Handle if there are checks passed using -C/--checks-file + elif checks_file: checks_to_execute = parse_checks_from_file(checks_file, provider) - except Exception as e: - logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}") - # Handle if there are services passed using -s/--services - elif service_list: - checks_to_execute = recover_checks_from_service(service_list, provider) + # Handle if there are services passed using -s/--services + elif service_list: + checks_to_execute = recover_checks_from_service(service_list, provider) - # Handle if there are compliance frameworks passed using --compliance - elif compliance_frameworks: - try: + # Handle if there are compliance frameworks passed using --compliance + elif compliance_frameworks: checks_to_execute = parse_checks_from_compliance_framework( compliance_frameworks, bulk_compliance_frameworks ) - except Exception as e: - logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}") - # Handle if there are categories passed using --categories - elif categories: - for cat in categories: - for check in bulk_checks_metadata: - # Check check's categories - if cat in bulk_checks_metadata[check].Categories: - checks_to_execute.add(check) + # Handle if there are categories passed using --categories + elif categories: + for category in categories: + checks_to_execute.update(check_categories[category]) - # If there are no checks passed as argument - else: - try: + # If there are no checks passed as argument + else: # Get all check modules to run with the specific provider checks = recover_checks_from_provider(provider) - except Exception as e: - logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}") - else: + for check_info in checks: # Recover check name from import path (last part) # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" check_name = check_info[0] checks_to_execute.add(check_name) - # Get Check Aliases mapping - check_aliases = {} - for check, metadata in bulk_checks_metadata.items(): - for alias in metadata.CheckAliases: - check_aliases[alias] = check + # Check Aliases + checks_to_execute = update_checks_to_execute_with_aliases( + checks_to_execute, check_aliases + ) + return checks_to_execute + + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}" + ) + + +def update_checks_to_execute_with_aliases( + checks_to_execute: set, check_aliases: dict +) -> set: + """update_checks_to_execute_with_aliases returns the checks_to_execute updated using the check aliases.""" # Verify if any input check is an alias of another check for input_check in checks_to_execute: if ( @@ -101,5 +127,4 @@ def load_checks_to_execute( print( f"\nUsing alias {Fore.YELLOW}{input_check}{Style.RESET_ALL} for check {Fore.YELLOW}{check_aliases[input_check]}{Style.RESET_ALL}...\n" ) - return checks_to_execute diff --git a/tests/lib/check/check_loader_test.py b/tests/lib/check/check_loader_test.py new file mode 100644 index 00000000..8d9f351f --- /dev/null +++ b/tests/lib/check/check_loader_test.py @@ -0,0 +1,319 @@ +from mock import patch + +from prowler.lib.check.checks_loader import ( + load_checks_to_execute, + update_checks_to_execute_with_aliases, +) +from prowler.lib.check.models import ( + Check_Metadata_Model, + Code, + Recommendation, + Remediation, +) + +S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME = "s3_bucket_level_public_access_block" +S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_CUSTOM_ALIAS = ( + "s3_bucket_level_public_access_block" +) +S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY = "medium" +S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_SERVICE = "s3" + + +class TestCheckLoader: + provider = "aws" + + def get_custom_check_metadata(self): + return Check_Metadata_Model( + Provider="aws", + CheckID=S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME, + CheckTitle="Check S3 Bucket Level Public Access Block.", + CheckType=["Data Protection"], + CheckAliases=[S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_CUSTOM_ALIAS], + ServiceName=S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_SERVICE, + SubServiceName="", + ResourceIdTemplate="arn:partition:s3:::bucket_name", + Severity=S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY, + ResourceType="AwsS3Bucket", + Description="Check S3 Bucket Level Public Access Block.", + Risk="Public access policies may be applied to sensitive data buckets.", + RelatedUrl="https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html", + Remediation=Remediation( + Code=Code( + NativeIaC="", + Terraform="https://docs.bridgecrew.io/docs/bc_aws_s3_20#terraform", + CLI="aws s3api put-public-access-block --region --public-access-block-configuration BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true --bucket ", + Other="https://github.com/cloudmatos/matos/tree/master/remediations/aws/s3/s3/block-public-access", + ), + Recommendation=Recommendation( + Text="You can enable Public Access Block at the bucket level to prevent the exposure of your data stored in S3.", + Url="https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html", + ), + ), + Categories=["internet-exposed"], + DependsOn=[], + RelatedTo=[], + Notes="", + Compliance=[], + ) + + def test_load_checks_to_execute(self): + bulk_checks_metatada = { + S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata() + } + bulk_compliance_frameworks = None + checks_file = None + check_list = None + service_list = None + severities = None + compliance_frameworks = None + categories = None + + with patch( + "prowler.lib.check.checks_loader.recover_checks_from_provider", + return_value=[ + ( + f"{S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME}", + "path/to/{S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME}", + ) + ], + ): + assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute( + bulk_checks_metatada, + bulk_compliance_frameworks, + checks_file, + check_list, + service_list, + severities, + compliance_frameworks, + categories, + self.provider, + ) + + def test_load_checks_to_execute_with_check_list(self): + bulk_checks_metatada = { + S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata() + } + bulk_compliance_frameworks = None + checks_file = None + check_list = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME] + service_list = None + severities = None + compliance_frameworks = None + categories = None + + assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute( + bulk_checks_metatada, + bulk_compliance_frameworks, + checks_file, + check_list, + service_list, + severities, + compliance_frameworks, + categories, + self.provider, + ) + + def test_load_checks_to_execute_with_severities(self): + bulk_checks_metatada = { + S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata() + } + bulk_compliance_frameworks = None + checks_file = None + check_list = [] + service_list = None + severities = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY] + compliance_frameworks = None + categories = None + + assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute( + bulk_checks_metatada, + bulk_compliance_frameworks, + checks_file, + check_list, + service_list, + severities, + compliance_frameworks, + categories, + self.provider, + ) + + def test_load_checks_to_execute_with_severities_and_services(self): + bulk_checks_metatada = { + S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata() + } + bulk_compliance_frameworks = None + checks_file = None + check_list = [] + service_list = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_SERVICE] + severities = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY] + compliance_frameworks = None + categories = None + + with patch( + "prowler.lib.check.checks_loader.recover_checks_from_service", + return_value={S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME}, + ): + assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute( + bulk_checks_metatada, + bulk_compliance_frameworks, + checks_file, + check_list, + service_list, + severities, + compliance_frameworks, + categories, + self.provider, + ) + + def test_load_checks_to_execute_with_severities_and_services_not_within_severity( + self, + ): + bulk_checks_metatada = { + S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata() + } + bulk_compliance_frameworks = None + checks_file = None + check_list = [] + service_list = ["ec2"] + severities = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY] + compliance_frameworks = None + categories = None + + with patch( + "prowler.lib.check.checks_loader.recover_checks_from_service", + return_value={"ec2_ami_public"}, + ): + assert set() == load_checks_to_execute( + bulk_checks_metatada, + bulk_compliance_frameworks, + checks_file, + check_list, + service_list, + severities, + compliance_frameworks, + categories, + self.provider, + ) + + def test_load_checks_to_execute_with_checks_file( + self, + ): + bulk_checks_metatada = { + S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata() + } + bulk_compliance_frameworks = None + checks_file = "path/to/test_file" + check_list = [] + service_list = [] + severities = [] + compliance_frameworks = None + categories = None + + with patch( + "prowler.lib.check.checks_loader.parse_checks_from_file", + return_value={S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME}, + ): + assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute( + bulk_checks_metatada, + bulk_compliance_frameworks, + checks_file, + check_list, + service_list, + severities, + compliance_frameworks, + categories, + self.provider, + ) + + def test_load_checks_to_execute_with_service_list( + self, + ): + bulk_checks_metatada = { + S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata() + } + bulk_compliance_frameworks = None + checks_file = None + check_list = [] + service_list = [S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME_SERVICE] + severities = [] + compliance_frameworks = None + categories = None + + with patch( + "prowler.lib.check.checks_loader.recover_checks_from_service", + return_value={S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME}, + ): + assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute( + bulk_checks_metatada, + bulk_compliance_frameworks, + checks_file, + check_list, + service_list, + severities, + compliance_frameworks, + categories, + self.provider, + ) + + def test_load_checks_to_execute_with_compliance_frameworks( + self, + ): + bulk_checks_metatada = { + S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata() + } + bulk_compliance_frameworks = None + checks_file = None + check_list = [] + service_list = [] + severities = [] + compliance_frameworks = ["test-compliance-framework"] + categories = None + + with patch( + "prowler.lib.check.checks_loader.parse_checks_from_compliance_framework", + return_value={S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME}, + ): + assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute( + bulk_checks_metatada, + bulk_compliance_frameworks, + checks_file, + check_list, + service_list, + severities, + compliance_frameworks, + categories, + self.provider, + ) + + def test_load_checks_to_execute_with_categories( + self, + ): + bulk_checks_metatada = { + S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME: self.get_custom_check_metadata() + } + bulk_compliance_frameworks = None + checks_file = None + check_list = [] + service_list = [] + severities = [] + compliance_frameworks = [] + categories = {"internet-exposed"} + + assert {S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME} == load_checks_to_execute( + bulk_checks_metatada, + bulk_compliance_frameworks, + checks_file, + check_list, + service_list, + severities, + compliance_frameworks, + categories, + self.provider, + ) + + def test_update_checks_to_execute_with_aliases(self): + checks_to_execute = {"renamed_check"} + check_aliases = {"renamed_check": "check_name"} + assert {"check_name"} == update_checks_to_execute_with_aliases( + checks_to_execute, check_aliases + ) diff --git a/tests/lib/check/check_test.py b/tests/lib/check/check_test.py index 02d3b1b3..e115c509 100644 --- a/tests/lib/check/check_test.py +++ b/tests/lib/check/check_test.py @@ -3,7 +3,7 @@ import pathlib from importlib.machinery import FileFinder from pkgutil import ModuleInfo -from boto3 import client, session +from boto3 import client from fixtures.bulk_checks_metadata import test_bulk_checks_metadata from mock import patch from moto import mock_s3 @@ -27,8 +27,7 @@ from prowler.providers.aws.aws_provider import ( get_checks_from_input_arn, get_regions_from_audit_resources, ) -from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info -from prowler.providers.common.models import Audit_Metadata +from tests.providers.aws.audit_info_utils import set_mocked_aws_audit_info AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION = "us-east-1" @@ -258,36 +257,6 @@ def mock_recover_checks_from_aws_provider_rds_service(*_): class Test_Check: - def set_mocked_audit_info(self): - audit_info = AWS_Audit_Info( - session_config=None, - original_session=None, - audit_session=session.Session( - profile_name=None, - botocore_session=None, - ), - audited_account=AWS_ACCOUNT_NUMBER, - audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root", - audited_user_id=None, - audited_partition="aws", - audited_identity_arn=None, - profile=None, - profile_region=None, - credentials=None, - assumed_role_info=None, - audited_regions=None, - organizations_metadata=None, - audit_resources=None, - mfa_enabled=False, - audit_metadata=Audit_Metadata( - services_scanned=0, - expected_checks=[], - completed_checks=0, - audit_progress=0, - ), - ) - return audit_info - def test_load_check_metadata(self): test_cases = [ { @@ -363,7 +332,7 @@ class Test_Check: provider = test["input"]["provider"] assert ( parse_checks_from_folder( - self.set_mocked_audit_info(), check_folder, provider + set_mocked_aws_audit_info(), check_folder, provider ) == test["expected"] )