Files
prowler/prowler
2022-08-31 16:40:59 +02:00

344 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import sys
from os import mkdir
from os.path import isdir
from config.config import default_output_directory, output_file_timestamp
from lib.banner import print_banner, print_version
from lib.check.check import (
bulk_load_checks_metadata,
exclude_checks_to_run,
exclude_groups_to_run,
exclude_services_to_run,
import_check,
list_groups,
list_services,
print_checks,
print_services,
run_check,
set_output_options,
)
from lib.check.checks_loader import load_checks_to_execute
from lib.logger import logger, set_logging_config
from lib.outputs.outputs import close_json, send_to_s3_bucket
from providers.aws.aws_provider import provider_set_session
from providers.aws.lib.security_hub.security_hub import (
resolve_security_hub_previous_findings,
)
if __name__ == "__main__":
# CLI Arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"provider",
choices=["aws"],
nargs="?",
default="aws",
help="Specify Cloud Provider",
)
# Arguments to set checks to run
# The following arguments needs to be set exclusivelly
group = parser.add_mutually_exclusive_group()
group.add_argument("-c", "--checks", nargs="+", help="List of checks")
group.add_argument("-C", "--checks-file", nargs="?", help="List of checks")
group.add_argument("-s", "--services", nargs="+", help="List of services")
group.add_argument("-g", "--groups", nargs="+", help="List of groups")
group.add_argument(
"--severity",
nargs="+",
help="List of severities [informational, low, medium, high, critical]",
choices=["informational", "low", "medium", "high", "critical"],
)
# Exclude checks options
parser.add_argument("-e", "--excluded-checks", nargs="+", help="Checks to exclude")
parser.add_argument("-E", "--excluded-groups", nargs="+", help="Groups to exclude")
parser.add_argument("--excluded-services", nargs="+", help="Services to exclude")
# List checks options
list_group = parser.add_mutually_exclusive_group()
list_group.add_argument(
"-L", "--list-groups", action="store_true", help="List groups"
)
list_group.add_argument(
"-l", "--list-checks", action="store_true", help="List checks"
)
list_group.add_argument(
"--list-services", action="store_true", help="List services"
)
parser.add_argument(
"-b", "--no-banner", action="store_false", help="Hide Prowler banner"
)
parser.add_argument(
"-V", "-v", "--version", action="store_true", help="Show Prowler version"
)
parser.add_argument(
"-q", "--quiet", action="store_true", help="Show only Prowler failed findings"
)
# Both options can be combined to only report to file some log level
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="CRITICAL",
help="Select Log Level",
)
parser.add_argument(
"--log-file",
nargs="?",
help="Set log file name",
)
parser.add_argument(
"-p",
"--profile",
nargs="?",
default=None,
help="AWS profile to launch prowler with",
)
parser.add_argument(
"-R",
"--role",
nargs="?",
default=None,
help="ARN of the role to be assumed",
)
parser.add_argument(
"-T",
"--session-duration",
nargs="?",
default=3600,
type=int,
help="Assumed role session duration in seconds, by default 3600",
)
parser.add_argument(
"-I",
"--external-id",
nargs="?",
default=None,
help="External ID to be passed when assuming role",
)
parser.add_argument(
"-f",
"--filter-region",
nargs="+",
help="AWS region names to run Prowler against",
)
parser.add_argument(
"-M",
"--output-modes",
nargs="+",
help="Output mode, by default csv",
choices=["csv", "json", "json-asff"],
)
parser.add_argument(
"-F",
"--output-filename",
nargs="?",
default=None,
help="Custom output report name, if not specified will use default output/prowler-output-ACCOUNT_NUM-OUTPUT_DATE.format.",
)
parser.add_argument(
"-o",
"--output-directory",
nargs="?",
help="Custom output directory, by default the folder where Prowler is stored",
default=default_output_directory,
)
parser.add_argument(
"-O",
"--organizations-role",
nargs="?",
help="Specify AWS Organizations management role ARN to be assumed, to get Organization metadata",
)
parser.add_argument(
"-S",
"--security-hub",
action="store_true",
help="Send check output to AWS Security Hub",
)
bucket = parser.add_mutually_exclusive_group()
bucket.add_argument(
"-B",
"--output-bucket",
nargs="?",
default=None,
help="Custom output bucket, requires -M <mode> and it can work also with -o flag.",
)
bucket.add_argument(
"-D",
"--output-bucket-no-assume",
nargs="?",
default=None,
help="Same as -B but do not use the assumed role credentials to put objects to the bucket, instead uses the initial credentials.",
)
# Parse Arguments
args = parser.parse_args()
provider = args.provider
checks = args.checks
excluded_checks = args.excluded_checks
excluded_groups = args.excluded_groups
excluded_services = args.excluded_services
services = args.services
groups = args.groups
checks_file = args.checks_file
output_directory = args.output_directory
output_filename = args.output_filename
severities = args.severity
output_modes = args.output_modes
# Set Logger configuration
set_logging_config(args.log_file, args.log_level)
# Role assumption input options tests
if args.session_duration not in range(900, 43200):
logger.critical("Value for -T option must be between 900 and 43200")
sys.exit()
if args.session_duration != 3600 or args.external_id:
if not args.role:
logger.critical("To use -I/-T options -R option is needed")
sys.exit()
if args.version:
print_version()
sys.exit()
if args.no_banner:
print_banner()
if args.list_groups:
list_groups(provider)
sys.exit()
if args.list_services:
print_services(list_services(provider))
sys.exit()
# Load checks metadata
logger.debug("Loading checks metadata from .metadata.json files")
bulk_checks_metadata = bulk_load_checks_metadata(provider)
# Load checks to execute
checks_to_execute = load_checks_to_execute(
bulk_checks_metadata,
checks_file,
checks,
services,
groups,
severities,
provider,
)
# Exclude checks if -e/--excluded-checks
if excluded_checks:
checks_to_execute = exclude_checks_to_run(checks_to_execute, excluded_checks)
# Exclude groups if -g/--excluded-groups
if excluded_groups:
checks_to_execute = exclude_groups_to_run(
checks_to_execute, excluded_groups, provider
)
# Exclude services if -s/--excluded-services
if excluded_services:
checks_to_execute = exclude_services_to_run(
checks_to_execute, excluded_services, provider
)
# Sort final check list
checks_to_execute = sorted(checks_to_execute)
# If -l/--list-checks passed as argument, print checks to execute and quit
if args.list_checks:
print_checks(provider, checks_to_execute, bulk_checks_metadata)
sys.exit()
# If security hub sending enabled, it is need to create json-asff output
if args.security_hub:
if not output_modes:
output_modes = ["json-asff"]
else:
output_modes.append("json-asff")
# Check output directory, if it is not created -> create it
if output_directory:
if not isdir(output_directory):
if output_modes:
mkdir(output_directory)
# Set global session
audit_info = provider_set_session(
args.profile,
args.role,
args.session_duration,
args.external_id,
args.filter_region,
args.organizations_role,
)
# Check if custom output filename was input, if not, set the default
if not output_filename:
output_filename = (
f"prowler-output-{audit_info.audited_account}-{output_file_timestamp}"
)
# Setting output options
audit_output_options = set_output_options(
args.quiet, output_modes, output_directory, args.security_hub, output_filename
)
# Execute checks
if len(checks_to_execute):
for check_name in checks_to_execute:
# Recover service from check name
service = check_name.split("_")[0]
try:
# Import check module
check_module_path = (
f"providers.{provider}.services.{service}.{check_name}.{check_name}"
)
lib = import_check(check_module_path)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
c = check_to_execute()
# Run check
run_check(c, audit_info, audit_output_options)
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
logger.error(
f"Check '{check_name}' was not found for the {provider.upper()} provider"
)
else:
logger.error(
"There are no checks to execute. Please, check your input arguments"
)
if output_modes:
for mode in output_modes:
# Close json file if exists
if mode == "json" or mode == "json-asff":
close_json(output_filename, output_directory, mode)
# Send output to S3 if needed (-B / -D)
if args.output_bucket or args.output_bucket_no_assume:
output_bucket = args.output_bucket
bucket_session = audit_info.audit_session
# Check if -D was input
if args.output_bucket_no_assume:
output_bucket = args.output_bucket_no_assume
bucket_session = audit_info.original_session
send_to_s3_bucket(
output_filename,
output_directory,
mode,
output_bucket,
bucket_session,
)
# Resolve previous fails of Security Hub
if args.security_hub:
resolve_security_hub_previous_findings(output_directory, audit_info)