feat(checks): Select checks to run from provider using -c/--checks (#1197)

* feat(checks): Select checks to run

* Update providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py

Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
This commit is contained in:
Pepe Fagoaga
2022-06-16 12:20:03 +02:00
committed by GitHub
parent 33c6801501
commit 162852634e
4 changed files with 95 additions and 58 deletions

View File

@@ -1,7 +1,57 @@
import importlib
import json
import pkgutil
from abc import ABC, abstractmethod
from dataclasses import dataclass
from lib.logger import logger
from lib.outputs import report
def load_checks_to_execute(check_list, provider):
checks_to_execute = set()
# LOADER
# Handle if there are checks passed using -c/--checks
if check_list:
for check_name in check_list:
checks_to_execute.add(check_name)
# If there are no checks passed as argument
else:
# Get all check modules to run with the specific provider
modules = recover_modules_from_provider(provider)
for check_module in modules:
# Recover check name from import path (last part)
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
check_name = check_module.split(".")[-1]
checks_to_execute.add(check_name)
return checks_to_execute
def recover_modules_from_provider(provider):
modules = []
for module_name in pkgutil.walk_packages(
importlib.import_module(f"providers.{provider}.services").__path__,
importlib.import_module(f"providers.{provider}.services").__name__ + ".",
):
# Format: "providers.{provider}.services.{service}.{check_name}.{check_name}"
if module_name.name.count(".") == 5:
modules.append(module_name.name)
return modules
def run_check(check):
print(f"\nCheck Name: {check.CheckName}")
logger.debug(f"Executing check: {check.CheckName}")
findings = check.execute()
report(findings)
def import_check(check_path):
lib = importlib.import_module(f"{check_path}")
return lib
@dataclass
class Check_Report:
@@ -9,6 +59,11 @@ class Check_Report:
region: str
result_extended: str
def __init__(self):
self.status = ""
self.region = ""
self.result_extended = ""
class Check(ABC):
def __init__(self):

14
lib/check_test.py Normal file
View File

@@ -0,0 +1,14 @@
import importlib
class Test_Check:
def test_import_check(self):
test_cases = [
{
"name": "Test valid check path",
"input": "providers.aws.services.iam.iam_disable_30_days_credentials.iam_disable_30_days_credentials",
"expected": "providers.aws.services.iam.iam_disable_30_days_credentials.iam_disable_30_days_credentials",
}
]
for test in test_cases:
assert importlib.import_module(test["input"]).__name__ == test["expected"]

View File

@@ -9,13 +9,11 @@ maximum_expiration_days = 30
class iam_disable_30_days_credentials(Check):
def execute(self) -> Check_Report:
findings = []
report = Check_Report
response = iam_client.users
if response:
for user in response:
report = Check_Report
report = Check_Report()
if "PasswordLastUsed" in user and user["PasswordLastUsed"] != "":
try:
time_since_insertion = (
@@ -42,8 +40,10 @@ class iam_disable_30_days_credentials(Check):
# Append report
findings.append(report)
else:
report = Check_Report()
report.status = "PASS"
report.result_extended = "There is no IAM users"
report.region = "us-east-1"
findings.append(report)
return findings

View File

@@ -2,44 +2,17 @@
# -*- coding: utf-8 -*-
import argparse
import importlib
import pkgutil
from lib.banner import print_banner, print_version
from lib.check import import_check, load_checks_to_execute, run_check
from lib.logger import logger, logging_levels
from lib.outputs import report
from providers.aws.aws_provider import Input_Data, provider_set_session
def run_check(check):
print(f"\nCheck Name: {check.CheckName}")
findings = check.execute()
report(findings)
def import_check(check_path):
lib = importlib.import_module(f"{check_path}")
return lib
def recover_modules_from_provider(provider):
modules = []
for module_name in pkgutil.walk_packages(
importlib.import_module(f"providers.{provider}.services").__path__,
importlib.import_module(f"providers.{provider}.services").__name__ + ".",
):
if module_name.name.count(".") == 5:
modules.append(module_name.name)
return modules
if __name__ == "__main__":
# start_time = time.time()
# CLI Arguments
parser = argparse.ArgumentParser()
parser.add_argument("provider", help="Specify Provider: AWS")
parser.add_argument(
"-c", "--checks", nargs="+", help="Comma separated list of checks"
)
parser.add_argument("provider", choices=["aws"], help="Specify Provider")
parser.add_argument("-c", "--checks", nargs="+", help="List of checks")
parser.add_argument(
"-b", "--no-banner", action="store_false", help="Hide Prowler Banner"
)
@@ -49,7 +22,7 @@ if __name__ == "__main__":
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="CRITICAL",
default="ERROR",
help="Select Log Level",
)
parser.add_argument(
@@ -135,33 +108,28 @@ if __name__ == "__main__":
# Setting session
provider_set_session(session_input)
# libreria para generar la lista de checks
if checks:
for check in checks:
# Recover service from check name
service = check.split("_")[0]
# Import check module
lib = import_check(
f"providers.{provider}.services.{service}.{check}.{check}"
)
# Recover functions from check
check_to_execute = getattr(lib, check)
c = check_to_execute()
# Run check
run_check(c)
# Load checks to execute
logger.debug("Loading checks")
checks_to_execute = load_checks_to_execute(checks, provider)
else:
# Get all check modules to run
modules = recover_modules_from_provider(provider)
# Run checks
for check_module in modules:
print(check_module)
# Execute checks
for check_name in checks_to_execute:
# Recover service from check name
service = check_name.split("_")[0]
try:
# Import check module
lib = import_check(check_module)
# Recover module from check name
check_name = check_module.split(".")[5]
check_module_path = (
f"providers.{provider}.services.{service}.{check_name}.{check_name}"
)
lib = import_check(check_module_path)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
c = check_to_execute()
# Run check
run_check(c)
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
logger.error(
f"Check '{check_name}' was not found for the {provider.upper()} provider"
)