From 21f8f56c1874e69eb8b7df6f3d3aa705576a823e Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Tue, 21 Jun 2022 08:05:32 +0200 Subject: [PATCH] feat(exclude-groups-and-services) (#1205) --- groups.json | 4 ++ lib/check/check.py | 38 ++++++++++++++++-- lib/check/check_test.py | 87 ++++++++++++++++++++++++++++++++++++++++- prowler.py | 34 +++++++++++++++- 4 files changed, 157 insertions(+), 6 deletions(-) diff --git a/groups.json b/groups.json index 80ac60fb..f2cf8f1b 100644 --- a/groups.json +++ b/groups.json @@ -3,6 +3,10 @@ "gdpr": [ "check11", "check12" + ], + "iam": [ + "iam_disable_30_days_credentials", + "iam_disable_90_days_credentials" ] } } diff --git a/lib/check/check.py b/lib/check/check.py index 8fc51c50..43b76d6a 100644 --- a/lib/check/check.py +++ b/lib/check/check.py @@ -18,6 +18,36 @@ def exclude_checks_to_run(checks_to_execute: set, excluded_checks: list) -> set: return checks_to_execute +# Exclude groups to run +def exclude_groups_to_run( + checks_to_execute: set, excluded_groups: list, provider: str +) -> set: + # Recover checks from the input groups + + checks_from_groups = parse_groups_from_file(groups_file, excluded_groups, provider) + for check_name in checks_from_groups: + checks_to_execute.discard(check_name) + return checks_to_execute + + +def exclude_services_to_run( + checks_to_execute: set, excluded_services: list, provider: str +) -> set: + # Recover checks from the input services + for service in excluded_services: + modules = recover_modules_from_provider(provider, service) + if not modules: + logger.error(f"Service '{service}' was not found for the AWS provider") + else: + for check_module in modules: + # Recover check name and module name from import path + # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" + check_name = check_module.split(".")[-1] + # Exclude checks from the input services + checks_to_execute.discard(check_name) + return checks_to_execute + + # Load checks from checklist.json def parse_checks_from_file(input_file: str, provider: str) -> set: checks_to_execute = set() @@ -31,9 +61,9 @@ def parse_checks_from_file(input_file: str, provider: str) -> set: # Load checks from groups.json -def parse_groups_from_file(group_list: list, provider: str) -> set: +def parse_groups_from_file(group_file: str, group_list: list, provider: str) -> set: checks_to_execute = set() - f = open_file(groups_file) + f = open_file(group_file) available_groups = parse_json_file(f) for group in group_list: @@ -89,7 +119,9 @@ def load_checks_to_execute( # Handle if there are groups passed using -g/--groups elif group_list: try: - checks_to_execute = parse_groups_from_file(group_list, provider) + checks_to_execute = parse_groups_from_file( + groups_file, group_list, provider + ) except Exception as e: logger.error(f"{e.__class__.__name__} -- {e}") diff --git a/lib/check/check_test.py b/lib/check/check_test.py index 7fa24d46..cfb35308 100644 --- a/lib/check/check_test.py +++ b/lib/check/check_test.py @@ -2,6 +2,8 @@ import os from lib.check.check import ( exclude_checks_to_run, + exclude_groups_to_run, + exclude_services_to_run, parse_checks_from_file, parse_groups_from_file, ) @@ -37,14 +39,21 @@ class Test_Check: def test_parse_groups_from_file(self): test_cases = [ { - "input": {"groups": ["gdpr"], "provider": "aws"}, + "input": { + "groups": ["gdpr"], + "provider": "aws", + "group_file": f"{os.path.dirname(os.path.realpath(__name__))}/groups.json", + }, "expected": {"check11", "check12"}, } ] for test in test_cases: provider = test["input"]["provider"] groups = test["input"]["groups"] - assert parse_groups_from_file(groups, provider) == test["expected"] + group_file = test["input"]["group_file"] + assert ( + parse_groups_from_file(group_file, groups, provider) == test["expected"] + ) def test_exclude_checks_to_run(self): test_cases = [ @@ -69,3 +78,77 @@ class Test_Check: assert ( exclude_checks_to_run(check_list, excluded_checks) == test["expected"] ) + + def test_exclude_groups_to_run(self): + test_cases = [ + { + "input": { + "excluded_group_list": {"gdpr"}, + "provider": "aws", + "checks_to_run": { + "iam_disable_30_days_credentials", + "iam_disable_90_days_credentials", + }, + }, + "expected": { + "iam_disable_30_days_credentials", + "iam_disable_90_days_credentials", + }, + }, + { + "input": { + "excluded_group_list": {"iam"}, + "provider": "aws", + "checks_to_run": { + "iam_disable_30_days_credentials", + "iam_disable_90_days_credentials", + }, + }, + "expected": set(), + }, + ] + for test in test_cases: + excluded_group_list = test["input"]["excluded_group_list"] + checks_to_run = test["input"]["checks_to_run"] + provider = test["input"]["provider"] + assert ( + exclude_groups_to_run(checks_to_run, excluded_group_list, provider) + == test["expected"] + ) + + def test_exclude_services_to_run(self): + test_cases = [ + { + "input": { + "checks_to_run": { + "iam_disable_30_days_credentials", + "iam_disable_90_days_credentials", + }, + "excluded_services": {"ec2"}, + "provider": "aws", + }, + "expected": { + "iam_disable_30_days_credentials", + "iam_disable_90_days_credentials", + }, + }, + { + "input": { + "checks_to_run": { + "iam_disable_30_days_credentials", + "iam_disable_90_days_credentials", + }, + "excluded_services": {"iam"}, + "provider": "aws", + }, + "expected": set(), + }, + ] + for test in test_cases: + excluded_services = test["input"]["excluded_services"] + checks_to_run = test["input"]["checks_to_run"] + provider = test["input"]["provider"] + assert ( + exclude_services_to_run(checks_to_run, excluded_services, provider) + == test["expected"] + ) diff --git a/prowler.py b/prowler.py index d5d44250..84977d0f 100644 --- a/prowler.py +++ b/prowler.py @@ -6,6 +6,8 @@ import argparse from lib.banner import print_banner, print_version from lib.check.check import ( exclude_checks_to_run, + exclude_groups_to_run, + exclude_services_to_run, import_check, load_checks_to_execute, run_check, @@ -27,6 +29,11 @@ if __name__ == "__main__": group.add_argument("-g", "--groups", nargs="+", help="List of groups") parser.add_argument("-e", "--excluded-checks", nargs="+", help="Checks to exclude") + parser.add_argument("-E", "--excluded-groups", nargs="+", help="Groups to exclude") + parser.add_argument( + "-S", "--excluded-services", nargs="+", help="Services to exclude" + ) + parser.add_argument( "-b", "--no-banner", action="store_false", help="Hide Prowler Banner" ) @@ -80,6 +87,8 @@ if __name__ == "__main__": provider = args.provider checks = args.checks excluded_checks = args.excluded_checks + excluded_groups = args.excluded_groups + excluded_services = args.excluded_services services = args.services groups = args.groups checks_file = args.checks_file @@ -95,7 +104,14 @@ if __name__ == "__main__": if not args.role: logger.critical("To use -I/-T options -R option is needed") quit() + + if args.version: + print_version() + quit() + if args.no_banner: + print_banner() + if args.version: print_version() quit() @@ -117,10 +133,22 @@ if __name__ == "__main__": checks_to_execute = load_checks_to_execute( checks_file, checks, services, groups, provider ) - # Exclude checks if -e + # Exclude checks if -e/--excluded-checks if excluded_checks: checks_to_execute = exclude_checks_to_run(checks_to_execute, excluded_checks) + # Exclude groups if -g/--excluded-groups + if excluded_groups: + checks_to_execute = exclude_groups_to_run( + checks_to_execute, excluded_groups, provider + ) + + # Exclude services if -s/--excluded-services + if excluded_services: + checks_to_execute = exclude_services_to_run( + checks_to_execute, excluded_services, provider + ) + # Execute checks if len(checks_to_execute): for check_name in checks_to_execute: @@ -143,3 +171,7 @@ if __name__ == "__main__": logger.error( f"Check '{check_name}' was not found for the {provider.upper()} provider" ) + else: + logger.error( + "There are no checks to execute. Please, check your input arguments" + )