From 162852634e75c0192da68bf620ad770a019e888f Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Thu, 16 Jun 2022 12:20:03 +0200 Subject: [PATCH] feat(checks): Select checks to run from provider using `-c/--checks` (#1197) * feat(checks): Select checks to run * Update providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com> --- lib/check.py | 55 +++++++++++++ lib/check_test.py | 14 ++++ .../iam_disable_30_days_credentials.py | 6 +- prowler.py | 78 ++++++------------- 4 files changed, 95 insertions(+), 58 deletions(-) create mode 100644 lib/check_test.py diff --git a/lib/check.py b/lib/check.py index 525e14df..2819e8d2 100644 --- a/lib/check.py +++ b/lib/check.py @@ -1,7 +1,57 @@ +import importlib import json +import pkgutil from abc import ABC, abstractmethod from dataclasses import dataclass +from lib.logger import logger +from lib.outputs import report + + +def load_checks_to_execute(check_list, provider): + checks_to_execute = set() + # LOADER + # Handle if there are checks passed using -c/--checks + if check_list: + for check_name in check_list: + checks_to_execute.add(check_name) + + # If there are no checks passed as argument + else: + # Get all check modules to run with the specific provider + modules = recover_modules_from_provider(provider) + for check_module in modules: + # Recover check name from import path (last part) + # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" + check_name = check_module.split(".")[-1] + checks_to_execute.add(check_name) + + return checks_to_execute + + +def recover_modules_from_provider(provider): + modules = [] + for module_name in pkgutil.walk_packages( + importlib.import_module(f"providers.{provider}.services").__path__, + importlib.import_module(f"providers.{provider}.services").__name__ + ".", + ): + # Format: "providers.{provider}.services.{service}.{check_name}.{check_name}" + if module_name.name.count(".") == 5: + modules.append(module_name.name) + return modules + + +def run_check(check): + print(f"\nCheck Name: {check.CheckName}") + logger.debug(f"Executing check: {check.CheckName}") + findings = check.execute() + report(findings) + + +def import_check(check_path): + lib = importlib.import_module(f"{check_path}") + return lib + @dataclass class Check_Report: @@ -9,6 +59,11 @@ class Check_Report: region: str result_extended: str + def __init__(self): + self.status = "" + self.region = "" + self.result_extended = "" + class Check(ABC): def __init__(self): diff --git a/lib/check_test.py b/lib/check_test.py new file mode 100644 index 00000000..9aee19ad --- /dev/null +++ b/lib/check_test.py @@ -0,0 +1,14 @@ +import importlib + + +class Test_Check: + def test_import_check(self): + test_cases = [ + { + "name": "Test valid check path", + "input": "providers.aws.services.iam.iam_disable_30_days_credentials.iam_disable_30_days_credentials", + "expected": "providers.aws.services.iam.iam_disable_30_days_credentials.iam_disable_30_days_credentials", + } + ] + for test in test_cases: + assert importlib.import_module(test["input"]).__name__ == test["expected"] diff --git a/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py b/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py index 449f907c..3f602006 100644 --- a/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py +++ b/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py @@ -9,13 +9,11 @@ maximum_expiration_days = 30 class iam_disable_30_days_credentials(Check): def execute(self) -> Check_Report: findings = [] - report = Check_Report - response = iam_client.users if response: for user in response: - report = Check_Report + report = Check_Report() if "PasswordLastUsed" in user and user["PasswordLastUsed"] != "": try: time_since_insertion = ( @@ -42,8 +40,10 @@ class iam_disable_30_days_credentials(Check): # Append report findings.append(report) else: + report = Check_Report() report.status = "PASS" report.result_extended = "There is no IAM users" report.region = "us-east-1" + findings.append(report) return findings diff --git a/prowler.py b/prowler.py index f5d630fc..d8c3eb79 100644 --- a/prowler.py +++ b/prowler.py @@ -2,44 +2,17 @@ # -*- coding: utf-8 -*- import argparse -import importlib -import pkgutil from lib.banner import print_banner, print_version +from lib.check import import_check, load_checks_to_execute, run_check from lib.logger import logger, logging_levels -from lib.outputs import report from providers.aws.aws_provider import Input_Data, provider_set_session - -def run_check(check): - print(f"\nCheck Name: {check.CheckName}") - findings = check.execute() - report(findings) - - -def import_check(check_path): - lib = importlib.import_module(f"{check_path}") - return lib - - -def recover_modules_from_provider(provider): - modules = [] - for module_name in pkgutil.walk_packages( - importlib.import_module(f"providers.{provider}.services").__path__, - importlib.import_module(f"providers.{provider}.services").__name__ + ".", - ): - if module_name.name.count(".") == 5: - modules.append(module_name.name) - return modules - - if __name__ == "__main__": - # start_time = time.time() + # CLI Arguments parser = argparse.ArgumentParser() - parser.add_argument("provider", help="Specify Provider: AWS") - parser.add_argument( - "-c", "--checks", nargs="+", help="Comma separated list of checks" - ) + parser.add_argument("provider", choices=["aws"], help="Specify Provider") + parser.add_argument("-c", "--checks", nargs="+", help="List of checks") parser.add_argument( "-b", "--no-banner", action="store_false", help="Hide Prowler Banner" ) @@ -49,7 +22,7 @@ if __name__ == "__main__": parser.add_argument( "--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], - default="CRITICAL", + default="ERROR", help="Select Log Level", ) parser.add_argument( @@ -135,33 +108,28 @@ if __name__ == "__main__": # Setting session provider_set_session(session_input) - # libreria para generar la lista de checks - if checks: - for check in checks: - # Recover service from check name - service = check.split("_")[0] - # Import check module - lib = import_check( - f"providers.{provider}.services.{service}.{check}.{check}" - ) - # Recover functions from check - check_to_execute = getattr(lib, check) - c = check_to_execute() - # Run check - run_check(c) + # Load checks to execute + logger.debug("Loading checks") + checks_to_execute = load_checks_to_execute(checks, provider) - else: - # Get all check modules to run - modules = recover_modules_from_provider(provider) - # Run checks - for check_module in modules: - print(check_module) + # Execute checks + for check_name in checks_to_execute: + # Recover service from check name + service = check_name.split("_")[0] + try: # Import check module - lib = import_check(check_module) - # Recover module from check name - check_name = check_module.split(".")[5] + check_module_path = ( + f"providers.{provider}.services.{service}.{check_name}.{check_name}" + ) + lib = import_check(check_module_path) # Recover functions from check check_to_execute = getattr(lib, check_name) c = check_to_execute() # Run check run_check(c) + + # If check does not exists in the provider or is from another provider + except ModuleNotFoundError: + logger.error( + f"Check '{check_name}' was not found for the {provider.upper()} provider" + )