feat(display): add progress bar and summary table (#1512)

Co-authored-by: sergargar <sergio@verica.io>
This commit is contained in:
Sergio Garcia
2022-11-22 11:18:43 +01:00
committed by GitHub
parent af1d85ae75
commit 9204142eaf
16 changed files with 320 additions and 106 deletions

View File

@@ -3,13 +3,15 @@ from pkgutil import walk_packages
from types import ModuleType
from typing import Any
from alive_progress import alive_bar
from colorama import Fore, Style
from config.config import groups_file
from lib.check.models import Output_From_Options, load_check_metadata
from config.config import groups_file, orange_color
from lib.check.models import Check, Output_From_Options, load_check_metadata
from lib.logger import logger
from lib.outputs.outputs import report
from lib.utils.utils import open_file, parse_json_file
from providers.aws.lib.audit_info.models import AWS_Audit_Info
# Load all checks metadata
@@ -189,6 +191,7 @@ def set_output_options(
security_hub_enabled: bool,
output_filename: str,
allowlist_file: str,
verbose: bool,
):
global output_options
output_options = Output_From_Options(
@@ -198,15 +201,18 @@ def set_output_options(
security_hub_enabled=security_hub_enabled,
output_filename=output_filename,
allowlist_file=allowlist_file,
verbose=verbose,
# set input options here
)
return output_options
def run_check(check, audit_info, output_options):
print(
f"\nCheck ID: {check.checkID} - {Fore.MAGENTA}{check.serviceName}{Fore.YELLOW} [{check.severity}]{Style.RESET_ALL}"
)
def run_check(check: Check, output_options: Output_From_Options) -> list:
findings = []
if output_options.verbose or output_options.is_quiet:
print(
f"\nCheck ID: {check.checkID} - {Fore.MAGENTA}{check.serviceName}{Fore.YELLOW} [{check.severity}]{Style.RESET_ALL}"
)
logger.debug(f"Executing check: {check.checkID}")
try:
findings = check.execute()
@@ -215,7 +221,54 @@ def run_check(check, audit_info, output_options):
logger.error(
f"{check.checkID} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
report(findings, output_options, audit_info)
finally:
pass
return findings
def execute_checks(
checks_to_execute: list,
provider: str,
audit_info: AWS_Audit_Info,
audit_output_options: Output_From_Options,
) -> list:
all_findings = []
print(
f"{Style.BRIGHT}Executing {len(checks_to_execute)} checks, please wait...{Style.RESET_ALL}\n"
)
with alive_bar(
total=len(checks_to_execute),
ctrl_c=False,
bar="blocks",
spinner="classic",
stats=False,
enrich_print=False,
) as bar:
for check_name in checks_to_execute:
# Recover service from check name
service = check_name.split("_")[0]
bar.title = f"-> Scanning {orange_color}{service}{Style.RESET_ALL} service"
try:
# Import check module
check_module_path = (
f"providers.{provider}.services.{service}.{check_name}.{check_name}"
)
lib = import_check(check_module_path)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
c = check_to_execute()
# Run check
check_findings = run_check(c, audit_output_options)
all_findings.extend(check_findings)
report(check_findings, audit_output_options, audit_info)
bar()
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
logger.error(
f"Check '{check_name}' was not found for the {provider.upper()} provider"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return all_findings

View File

@@ -16,6 +16,7 @@ class Output_From_Options:
security_hub_enabled: bool
output_filename: str
allowlist_file: str
verbose: str
# Testing Pending