feat(exclude-groups-and-services) (#1205)

This commit is contained in:
Pepe Fagoaga
2022-06-21 08:05:32 +02:00
committed by GitHub
parent e52ab12696
commit 21f8f56c18
4 changed files with 157 additions and 6 deletions

View File

@@ -3,6 +3,10 @@
"gdpr": [
"check11",
"check12"
],
"iam": [
"iam_disable_30_days_credentials",
"iam_disable_90_days_credentials"
]
}
}

View File

@@ -18,6 +18,36 @@ def exclude_checks_to_run(checks_to_execute: set, excluded_checks: list) -> set:
return checks_to_execute
# Exclude groups to run
def exclude_groups_to_run(
checks_to_execute: set, excluded_groups: list, provider: str
) -> set:
# Recover checks from the input groups
checks_from_groups = parse_groups_from_file(groups_file, excluded_groups, provider)
for check_name in checks_from_groups:
checks_to_execute.discard(check_name)
return checks_to_execute
def exclude_services_to_run(
checks_to_execute: set, excluded_services: list, provider: str
) -> set:
# Recover checks from the input services
for service in excluded_services:
modules = recover_modules_from_provider(provider, service)
if not modules:
logger.error(f"Service '{service}' was not found for the AWS provider")
else:
for check_module in modules:
# Recover check name and module name from import path
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = check_module.split(".")[-1]
# Exclude checks from the input services
checks_to_execute.discard(check_name)
return checks_to_execute
# Load checks from checklist.json
def parse_checks_from_file(input_file: str, provider: str) -> set:
checks_to_execute = set()
@@ -31,9 +61,9 @@ def parse_checks_from_file(input_file: str, provider: str) -> set:
# Load checks from groups.json
def parse_groups_from_file(group_list: list, provider: str) -> set:
def parse_groups_from_file(group_file: str, group_list: list, provider: str) -> set:
checks_to_execute = set()
f = open_file(groups_file)
f = open_file(group_file)
available_groups = parse_json_file(f)
for group in group_list:
@@ -89,7 +119,9 @@ def load_checks_to_execute(
# Handle if there are groups passed using -g/--groups
elif group_list:
try:
checks_to_execute = parse_groups_from_file(group_list, provider)
checks_to_execute = parse_groups_from_file(
groups_file, group_list, provider
)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")

View File

@@ -2,6 +2,8 @@ import os
from lib.check.check import (
exclude_checks_to_run,
exclude_groups_to_run,
exclude_services_to_run,
parse_checks_from_file,
parse_groups_from_file,
)
@@ -37,14 +39,21 @@ class Test_Check:
def test_parse_groups_from_file(self):
test_cases = [
{
"input": {"groups": ["gdpr"], "provider": "aws"},
"input": {
"groups": ["gdpr"],
"provider": "aws",
"group_file": f"{os.path.dirname(os.path.realpath(__name__))}/groups.json",
},
"expected": {"check11", "check12"},
}
]
for test in test_cases:
provider = test["input"]["provider"]
groups = test["input"]["groups"]
assert parse_groups_from_file(groups, provider) == test["expected"]
group_file = test["input"]["group_file"]
assert (
parse_groups_from_file(group_file, groups, provider) == test["expected"]
)
def test_exclude_checks_to_run(self):
test_cases = [
@@ -69,3 +78,77 @@ class Test_Check:
assert (
exclude_checks_to_run(check_list, excluded_checks) == test["expected"]
)
def test_exclude_groups_to_run(self):
test_cases = [
{
"input": {
"excluded_group_list": {"gdpr"},
"provider": "aws",
"checks_to_run": {
"iam_disable_30_days_credentials",
"iam_disable_90_days_credentials",
},
},
"expected": {
"iam_disable_30_days_credentials",
"iam_disable_90_days_credentials",
},
},
{
"input": {
"excluded_group_list": {"iam"},
"provider": "aws",
"checks_to_run": {
"iam_disable_30_days_credentials",
"iam_disable_90_days_credentials",
},
},
"expected": set(),
},
]
for test in test_cases:
excluded_group_list = test["input"]["excluded_group_list"]
checks_to_run = test["input"]["checks_to_run"]
provider = test["input"]["provider"]
assert (
exclude_groups_to_run(checks_to_run, excluded_group_list, provider)
== test["expected"]
)
def test_exclude_services_to_run(self):
test_cases = [
{
"input": {
"checks_to_run": {
"iam_disable_30_days_credentials",
"iam_disable_90_days_credentials",
},
"excluded_services": {"ec2"},
"provider": "aws",
},
"expected": {
"iam_disable_30_days_credentials",
"iam_disable_90_days_credentials",
},
},
{
"input": {
"checks_to_run": {
"iam_disable_30_days_credentials",
"iam_disable_90_days_credentials",
},
"excluded_services": {"iam"},
"provider": "aws",
},
"expected": set(),
},
]
for test in test_cases:
excluded_services = test["input"]["excluded_services"]
checks_to_run = test["input"]["checks_to_run"]
provider = test["input"]["provider"]
assert (
exclude_services_to_run(checks_to_run, excluded_services, provider)
== test["expected"]
)

View File

@@ -6,6 +6,8 @@ import argparse
from lib.banner import print_banner, print_version
from lib.check.check import (
exclude_checks_to_run,
exclude_groups_to_run,
exclude_services_to_run,
import_check,
load_checks_to_execute,
run_check,
@@ -27,6 +29,11 @@ if __name__ == "__main__":
group.add_argument("-g", "--groups", nargs="+", help="List of groups")
parser.add_argument("-e", "--excluded-checks", nargs="+", help="Checks to exclude")
parser.add_argument("-E", "--excluded-groups", nargs="+", help="Groups to exclude")
parser.add_argument(
"-S", "--excluded-services", nargs="+", help="Services to exclude"
)
parser.add_argument(
"-b", "--no-banner", action="store_false", help="Hide Prowler Banner"
)
@@ -80,6 +87,8 @@ if __name__ == "__main__":
provider = args.provider
checks = args.checks
excluded_checks = args.excluded_checks
excluded_groups = args.excluded_groups
excluded_services = args.excluded_services
services = args.services
groups = args.groups
checks_file = args.checks_file
@@ -95,7 +104,14 @@ if __name__ == "__main__":
if not args.role:
logger.critical("To use -I/-T options -R option is needed")
quit()
if args.version:
print_version()
quit()
if args.no_banner:
print_banner()
if args.version:
print_version()
quit()
@@ -117,10 +133,22 @@ if __name__ == "__main__":
checks_to_execute = load_checks_to_execute(
checks_file, checks, services, groups, provider
)
# Exclude checks if -e
# Exclude checks if -e/--excluded-checks
if excluded_checks:
checks_to_execute = exclude_checks_to_run(checks_to_execute, excluded_checks)
# Exclude groups if -g/--excluded-groups
if excluded_groups:
checks_to_execute = exclude_groups_to_run(
checks_to_execute, excluded_groups, provider
)
# Exclude services if -s/--excluded-services
if excluded_services:
checks_to_execute = exclude_services_to_run(
checks_to_execute, excluded_services, provider
)
# Execute checks
if len(checks_to_execute):
for check_name in checks_to_execute:
@@ -143,3 +171,7 @@ if __name__ == "__main__":
logger.error(
f"Check '{check_name}' was not found for the {provider.upper()} provider"
)
else:
logger.error(
"There are no checks to execute. Please, check your input arguments"
)