feat(args): Global and provider-specific arguments (#1540)

This commit is contained in:
Pepe Fagoaga
2022-12-14 17:39:05 +01:00
committed by GitHub
parent 27a79d9c8c
commit 2a5f032a52
10 changed files with 563 additions and 452 deletions

View File

@@ -1,18 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import sys
from os import mkdir
from os.path import isdir
from prowler.config.config import (
change_config_var,
default_output_directory,
get_aws_available_regions,
output_file_timestamp,
)
from prowler.lib.banner import print_banner, print_version
from prowler.lib.banner import print_banner
from prowler.lib.check.check import (
bulk_load_checks_metadata,
bulk_load_compliance_frameworks,
@@ -26,10 +17,10 @@ from prowler.lib.check.check import (
print_compliance_frameworks,
print_compliance_requirements,
print_services,
set_output_options,
)
from prowler.lib.check.checks_loader import load_checks_to_execute
from prowler.lib.check.compliance import update_checks_metadata_with_compliance
from prowler.lib.cli.parser import ProwlerArgumentParser
from prowler.lib.logger import logger, set_logging_config
from prowler.lib.outputs.outputs import (
add_html_footer,
@@ -43,241 +34,17 @@ from prowler.providers.aws.lib.quick_inventory.quick_inventory import quick_inve
from prowler.providers.aws.lib.security_hub.security_hub import (
resolve_security_hub_previous_findings,
)
from prowler.providers.common.common import set_provider_audit_info
from prowler.providers.common.audit_info import set_provider_audit_info
from prowler.providers.common.outputs import set_provider_output_options
def prowler():
# CLI Arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"provider",
choices=["aws", "azure"],
nargs="?",
default="aws",
help="Specify Cloud Provider",
)
# Arguments to set checks to run
# The following arguments needs to be set exclusivelly
group = parser.add_mutually_exclusive_group()
group.add_argument(
"-c", "--checks", nargs="+", help="List of checks to be executed."
)
group.add_argument(
"-C",
"--checks-file",
nargs="?",
help="Json file containing checks to be executed.",
)
group.add_argument(
"-s", "--services", nargs="+", help="List of services to be executed."
)
group.add_argument(
"--severity",
nargs="+",
help="List of severities to be executed [informational, low, medium, high, critical]",
choices=["informational", "low", "medium", "high", "critical"],
)
group.add_argument(
"--compliance",
nargs="+",
help="Compliance Framework to check against for. The format should be the following: framework_version_provider (e.g.: ens_rd2022_aws)",
choices=["ens_rd2022_aws", "cis_1.4_aws", "cis_1.5_aws"],
)
group.add_argument(
"--categories", nargs="+", help="List of categories to be executed.", default=[]
)
# Exclude checks options
parser.add_argument("-e", "--excluded-checks", nargs="+", help="Checks to exclude")
parser.add_argument("--excluded-services", nargs="+", help="Services to exclude")
# List checks options
list_group = parser.add_mutually_exclusive_group()
list_group.add_argument(
"-l", "--list-checks", action="store_true", help="List checks"
)
list_group.add_argument(
"--list-services", action="store_true", help="List services"
)
list_group.add_argument(
"--list-compliance", action="store_true", help="List compliance frameworks"
)
list_group.add_argument(
"--list-compliance-requirements",
nargs="+",
help="List compliance requirements for a given requirement",
choices=["ens_rd2022_aws", "cis_1.4_aws", "cis_1.5_aws"],
)
list_group.add_argument(
"--list-categories",
action="store_true",
help="List the available check's categories",
)
parser.add_argument(
"-b", "--no-banner", action="store_false", help="Hide Prowler banner"
)
parser.add_argument(
"-V", "-v", "--version", action="store_true", help="Show Prowler version"
)
parser.add_argument(
"-q", "--quiet", action="store_true", help="Show only Prowler failed findings"
)
# Both options can be combined to only report to file some log level
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="CRITICAL",
help="Select Log Level",
)
parser.add_argument(
"--log-file",
nargs="?",
help="Set log file name",
)
parser.add_argument(
"-p",
"--profile",
nargs="?",
default=None,
help="AWS profile to launch prowler with",
)
parser.add_argument(
"-R",
"--role",
nargs="?",
default=None,
help="ARN of the role to be assumed",
)
parser.add_argument(
"-T",
"--session-duration",
nargs="?",
default=3600,
type=int,
help="Assumed role session duration in seconds, by default 3600",
)
parser.add_argument(
"-I",
"--external-id",
nargs="?",
default=None,
help="External ID to be passed when assuming role",
)
parser.add_argument(
"-f",
"--filter-region",
nargs="+",
help="AWS region names to run Prowler against",
choices=get_aws_available_regions(),
)
parser.add_argument(
"-M",
"--output-modes",
nargs="+",
help="Output modes, by default csv, html and json",
default=["csv", "json", "html"],
choices=["csv", "json", "json-asff", "html"],
)
parser.add_argument(
"-F",
"--output-filename",
nargs="?",
default=None,
help="Custom output report name, if not specified will use default output/prowler-output-ACCOUNT_NUM-OUTPUT_DATE.format.",
)
parser.add_argument(
"-o",
"--output-directory",
nargs="?",
help="Custom output directory, by default the folder where Prowler is stored",
default=default_output_directory,
)
parser.add_argument(
"-O",
"--organizations-role",
nargs="?",
help="Specify AWS Organizations management role ARN to be assumed, to get Organization metadata",
)
parser.add_argument(
"-S",
"--security-hub",
action="store_true",
help="Send check output to AWS Security Hub",
)
parser.add_argument(
"-i",
"--quick-inventory",
action="store_true",
help="Run Prowler Quick Inventory. The inventory will be stored in an output csv by default",
)
bucket = parser.add_mutually_exclusive_group()
bucket.add_argument(
"-B",
"--output-bucket",
nargs="?",
default=None,
help="Custom output bucket, requires -M <mode> and it can work also with -o flag.",
)
bucket.add_argument(
"-D",
"--output-bucket-no-assume",
nargs="?",
default=None,
help="Same as -B but do not use the assumed role credentials to put objects to the bucket, instead uses the initial credentials.",
)
parser.add_argument(
"-N",
"--shodan",
nargs="?",
default=None,
help="Shodan API key used by check ec2_elastic_ip_shodan.",
)
parser.add_argument(
"-w",
"--allowlist-file",
nargs="?",
default=None,
help="Path for allowlist yaml file. See example prowler/config/allowlist.yaml for reference and format. It also accepts AWS DynamoDB Table ARN or S3 URI, see more in https://docs.prowler.cloud/en/latest/tutorials/allowlist/",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Display detailed information about findings.",
)
parser.add_argument(
"--subscription-ids",
nargs="+",
default=[],
help="Azure subscription ids to be scanned by prowler",
)
az_auth = parser.add_mutually_exclusive_group()
az_auth.add_argument(
"--az-cli-auth",
action="store_true",
help="Use Azure cli credentials to log in against azure",
)
az_auth.add_argument(
"--sp-env-auth",
action="store_true",
help="Use service principal env variables authentication to log in against azure",
)
az_auth.add_argument(
"--browser-auth",
action="store_true",
help="Use browser authentication to log in against azure ",
)
az_auth.add_argument(
"--managed-identity-auth",
action="store_true",
help="Use managed identity authentication to log in against azure ",
)
# Parse Arguments
args = parser.parse_args()
parser = ProwlerArgumentParser()
args = parser.parser.parse_args()
# Save Arguments
provider = args.provider
checks = args.checks
excluded_checks = args.excluded_checks
@@ -285,39 +52,16 @@ def prowler():
services = args.services
categories = args.categories
checks_file = args.checks_file
output_directory = args.output_directory
output_filename = args.output_filename
severities = args.severity
compliance_framework = args.compliance
output_modes = args.output_modes
# Azure options
subscriptions = args.subscription_ids
az_cli_auth = args.az_cli_auth
sp_env_auth = args.sp_env_auth
browser_auth = args.browser_auth
managed_entity_auth = args.managed_identity_auth
# We treat the compliance framework as another output format
if compliance_framework:
output_modes.extend(compliance_framework)
args.output_modes.extend(compliance_framework)
# Set Logger configuration
set_logging_config(args.log_file, args.log_level)
# Role assumption input options tests
if args.session_duration not in range(900, 43200):
logger.critical("Value for -T option must be between 900 and 43200")
sys.exit()
if args.session_duration != 3600 or args.external_id:
if not args.role:
logger.critical("To use -I/-T options -R option is needed")
sys.exit()
if args.version:
print_version()
sys.exit()
if args.no_banner:
print_banner(args)
@@ -325,9 +69,6 @@ def prowler():
print_services(list_services(provider))
sys.exit()
if args.shodan:
change_config_var("shodan_api_key", args.shodan)
# Load checks metadata
logger.debug("Loading checks metadata from .metadata.json files")
bulk_checks_metadata = bulk_load_checks_metadata(provider)
@@ -392,45 +133,8 @@ def prowler():
print_checks(provider, checks_to_execute, bulk_checks_metadata)
sys.exit()
# If security hub sending enabled, it is need to create json-asff output
if args.security_hub:
if not output_modes:
output_modes = ["json-asff"]
else:
output_modes.append("json-asff")
# Check output directory, if it is not created -> create it
if output_directory:
if not isdir(output_directory):
if output_modes:
mkdir(output_directory)
arguments = {
"profile": args.profile,
"role": args.role,
"session_duration": args.session_duration,
"external_id": args.external_id,
"regions": args.filter_region,
"organizations_role": args.organizations_role,
"subscriptions": subscriptions,
"az_cli_auth": az_cli_auth,
"sp_env_auth": sp_env_auth,
"browser_auth": browser_auth,
"managed_entity_auth": managed_entity_auth,
}
audit_info = set_provider_audit_info(provider, arguments)
# Check if custom output filename was input, if not, set the default
if not output_filename:
if provider == "aws":
output_filename = (
f"prowler-output-{audit_info.audited_account}-{output_file_timestamp}"
)
elif provider == "azure":
if audit_info.identity.domain:
output_filename = f"prowler-output-{audit_info.identity.domain}-{output_file_timestamp}"
else:
output_filename = f"prowler-output-{'-'.join(audit_info.identity.tenant_ids)}-{output_file_timestamp}"
# Set the audit info based on the selected provider
audit_info = set_provider_audit_info(provider, args.__dict__)
# Parse content from Allowlist file and get it, if necessary, from S3
if args.allowlist_file:
@@ -438,76 +142,75 @@ def prowler():
else:
allowlist_file = None
# Setting output options
audit_output_options = set_output_options(
args.quiet,
output_modes,
output_directory,
args.security_hub,
output_filename,
allowlist_file,
bulk_checks_metadata,
args.verbose,
# Setting output options based on the selected provider
audit_output_options = set_provider_output_options(
provider, args, audit_info, allowlist_file, bulk_checks_metadata
)
if args.quick_inventory and provider == "aws":
quick_inventory(audit_info, output_directory)
# Quick Inventory for AWS
if provider == "aws" and args.quick_inventory:
quick_inventory(audit_info, args.output_directory)
sys.exit()
# Execute checks
findings = []
if len(checks_to_execute):
findings = execute_checks(
checks_to_execute, provider, audit_info, audit_output_options
)
else:
# Execute checks
findings = []
if len(checks_to_execute):
findings = execute_checks(
checks_to_execute, provider, audit_info, audit_output_options
)
else:
logger.error(
"There are no checks to execute. Please, check your input arguments"
)
if output_modes:
for mode in output_modes:
# Close json file if exists
if mode == "json" or mode == "json-asff":
close_json(output_filename, output_directory, mode)
if mode == "html":
add_html_footer(output_filename, output_directory)
# Send output to S3 if needed (-B / -D)
if args.output_bucket or args.output_bucket_no_assume:
output_bucket = args.output_bucket
bucket_session = audit_info.audit_session
# Check if -D was input
if args.output_bucket_no_assume:
output_bucket = args.output_bucket_no_assume
bucket_session = audit_info.original_session
send_to_s3_bucket(
output_filename,
output_directory,
mode,
output_bucket,
bucket_session,
)
# Resolve previous fails of Security Hub
if args.security_hub:
resolve_security_hub_previous_findings(output_directory, audit_info)
# Display summary table
display_summary_table(
findings,
audit_info,
audit_output_options,
provider,
logger.error(
"There are no checks to execute. Please, check your input arguments"
)
if compliance_framework and findings:
# Display compliance table
display_compliance_table(
findings,
bulk_checks_metadata,
compliance_framework,
audit_output_options.output_filename,
audit_output_options.output_directory,
)
if args.output_modes:
for mode in args.output_modes:
# Close json file if exists
if mode == "json" or mode == "json-asff":
close_json(
audit_output_options.output_filename, args.output_directory, mode
)
if mode == "html":
add_html_footer(
audit_output_options.output_filename, args.output_directory
)
# Send output to S3 if needed (-B / -D)
if args.output_bucket or args.output_bucket_no_assume:
output_bucket = args.output_bucket
bucket_session = audit_info.audit_session
# Check if -D was input
if args.output_bucket_no_assume:
output_bucket = args.output_bucket_no_assume
bucket_session = audit_info.original_session
send_to_s3_bucket(
audit_output_options.output_filename,
args.output_directory,
mode,
output_bucket,
bucket_session,
)
# Resolve previous fails of Security Hub
if args.security_hub:
resolve_security_hub_previous_findings(args.output_directory, audit_info)
# Display summary table
display_summary_table(
findings,
audit_info,
audit_output_options,
provider,
)
if compliance_framework and findings:
# Display compliance table
display_compliance_table(
findings,
bulk_checks_metadata,
compliance_framework,
audit_output_options.output_filename,
audit_output_options.output_directory,
)
if __name__ == "__main__":

View File

@@ -0,0 +1,7 @@
{
"aws": [
"glue_development_endpoints_cloudwatch_logs_encryption_enabled",
"emr_cluster_account_public_block_enabled",
"ec2_instance_public_ip"
]
}

View File

@@ -3,10 +3,6 @@ from colorama import Fore, Style
from prowler.config.config import banner_color, orange_color, prowler_version, timestamp
def print_version():
print(f"Prowler {prowler_version}")
def print_banner(args):
banner = f"""{banner_color} _
_ __ _ __ _____ _| | ___ _ __

View File

@@ -10,11 +10,12 @@ from colorama import Fore, Style
from prowler.config.config import orange_color
from prowler.lib.check.compliance_models import load_compliance_framework
from prowler.lib.check.models import Check, Output_From_Options, load_check_metadata
from prowler.lib.check.models import Check, load_check_metadata
from prowler.lib.logger import logger
from prowler.lib.outputs.outputs import report
from prowler.lib.utils.utils import open_file, parse_json_file
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.common.outputs import Provider_Output_Options
# Load all checks metadata
@@ -278,34 +279,7 @@ def import_check(check_path: str) -> ModuleType:
return lib
# Sets the Output_From_Options to be used in the output modes
def set_output_options(
quiet: bool,
output_modes: list,
input_output_directory: str,
security_hub_enabled: bool,
output_filename: str,
allowlist_file: str,
bulk_checks_metadata: dict,
verbose: bool,
):
"""Sets the Output_From_Options to be used in the output modes"""
global output_options
output_options = Output_From_Options(
is_quiet=quiet,
output_modes=output_modes,
output_directory=input_output_directory,
security_hub_enabled=security_hub_enabled,
output_filename=output_filename,
allowlist_file=allowlist_file,
bulk_checks_metadata=bulk_checks_metadata,
verbose=verbose,
# set input options here
)
return output_options
def run_check(check: Check, output_options: Output_From_Options) -> list:
def run_check(check: Check, output_options: Provider_Output_Options) -> list:
findings = []
if output_options.verbose or output_options.is_quiet:
print(
@@ -327,7 +301,7 @@ def execute_checks(
checks_to_execute: list,
provider: str,
audit_info: AWS_Audit_Info,
audit_output_options: Output_From_Options,
audit_output_options: Provider_Output_Options,
) -> list:
all_findings = []
print(

View File

@@ -8,20 +8,6 @@ from pydantic import BaseModel, ValidationError
from prowler.lib.logger import logger
@dataclass
class Output_From_Options:
"""Class to store the Prowler output modes options"""
is_quiet: bool
output_modes: list
output_directory: str
security_hub_enabled: bool
output_filename: str
allowlist_file: str
bulk_checks_metadata: dict
verbose: str
class Code(BaseModel):
"""Check's remediation information using IaC like CloudFormation, Terraform or the native CLI"""

321
prowler/lib/cli/parser.py Normal file
View File

@@ -0,0 +1,321 @@
import argparse
from prowler.config.config import default_output_directory, prowler_version
class ProwlerArgumentParser:
# Set the default parser
def __init__(self):
# CLI Arguments
self.parser = argparse.ArgumentParser(
prog="prowler",
epilog="To see the different available options on a specific provider, run: prowler {provider} -h",
)
# Default
self.parser.add_argument(
"-v",
"--version",
action="version",
version=f"Prowler {prowler_version}",
help="show Prowler version",
)
# Common arguments parser
self.common_providers_parser = argparse.ArgumentParser(add_help=False)
# Providers Parser
self.subparsers = self.parser.add_subparsers(
title="Prowler Available Cloud Providers", dest="provider"
)
self.__init_allowlist_parser__()
self.__init_outputs_parser__()
self.__init_logging_parser__()
self.__init_checks_parser__()
self.__init_exclude_checks_parser__()
self.__init_list_checks_parser__()
# Init Providers Arguments
self.__init_aws_parser__()
self.__init_azure_parser__()
def __init_allowlist_parser__(self):
# Allowlist
allowlist_parser = self.common_providers_parser.add_argument_group("Allowlist")
allowlist_parser.add_argument(
"-w",
"--allowlist-file",
nargs="?",
default=None,
help="Path for allowlist yaml file. See example prowler/config/allowlist.yaml for reference and format. It also accepts AWS DynamoDB Table ARN or S3 URI, see more in https://docs.prowler.cloud/en/latest/tutorials/allowlist/",
)
def __init_outputs_parser__(self):
# Outputs
common_outputs_parser = self.common_providers_parser.add_argument_group(
"Outputs"
)
common_outputs_parser.add_argument(
"-q",
"--quiet",
action="store_true",
help="Show only Prowler failed findings",
)
common_outputs_parser.add_argument(
"-M",
"--output-modes",
nargs="+",
help="Output modes, by default csv, html and json",
default=["csv", "json", "html"],
choices=["csv", "json", "json-asff", "html"],
)
common_outputs_parser.add_argument(
"-F",
"--output-filename",
nargs="?",
default=None,
help="Custom output report name, if not specified will use default output/prowler-output-ACCOUNT_NUM-OUTPUT_DATE.format",
)
common_outputs_parser.add_argument(
"-o",
"--output-directory",
nargs="?",
help="Custom output directory, by default the folder where Prowler is stored",
default=default_output_directory,
)
common_outputs_parser.add_argument(
"--verbose",
action="store_true",
help="Display detailed information about findings",
)
common_outputs_parser.add_argument(
"-b", "--no-banner", action="store_false", help="Hide Prowler banner"
)
def __init_logging_parser__(self):
# Logging Options
# Both options can be combined to only report to file some log level
common_logging_parser = self.common_providers_parser.add_argument_group(
"Logging"
)
common_logging_parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="CRITICAL",
help="Select Log Level",
)
common_logging_parser.add_argument(
"--log-file",
nargs="?",
help="Set log file name",
)
def __init_exclude_checks_parser__(self):
# Exclude checks options
exclude_checks_parser = self.common_providers_parser.add_argument_group(
"Exclude checks/services to run"
)
exclude_checks_parser.add_argument(
"-e", "--excluded-checks", nargs="+", help="Checks to exclude"
)
exclude_checks_parser.add_argument(
"--excluded-services", nargs="+", help="Services to exclude"
)
def __init_checks_parser__(self):
# Set checks to execute
common_checks_parser = self.common_providers_parser.add_argument_group(
"Specify checks/services to run arguments"
)
# The following arguments needs to be set exclusivelly
group = common_checks_parser.add_mutually_exclusive_group()
group.add_argument(
"-c", "--checks", nargs="+", help="List of checks to be executed."
)
group.add_argument(
"-C",
"--checks-file",
nargs="?",
help="JSON file containing the checks to be executed. See config/checklist_example.json",
)
group.add_argument(
"-s", "--services", nargs="+", help="List of services to be executed."
)
group.add_argument(
"--severity",
nargs="+",
help="List of severities to be executed [informational, low, medium, high, critical]",
choices=["informational", "low", "medium", "high", "critical"],
)
group.add_argument(
"--compliance",
nargs="+",
help="Compliance Framework to check against for. The format should be the following: framework_version_provider (e.g.: ens_rd2022_aws)",
choices=["ens_rd2022_aws", "cis_1.4_aws", "cis_1.5_aws"],
)
group.add_argument(
"--categories",
nargs="+",
help="List of categories to be executed.",
default=[],
)
def __init_list_checks_parser__(self):
# List checks options
list_checks_parser = self.common_providers_parser.add_argument_group(
"List checks/services/categories/compliance-framework checks"
)
list_group = list_checks_parser.add_mutually_exclusive_group()
list_group.add_argument(
"-l", "--list-checks", action="store_true", help="List checks"
)
list_group.add_argument(
"--list-services", action="store_true", help="List services"
)
list_group.add_argument(
"--list-compliance", action="store_true", help="List compliance frameworks"
)
list_group.add_argument(
"--list-compliance-requirements",
nargs="+",
help="List compliance requirements for a given requirement",
choices=["ens_rd2022_aws", "cis_1.4_aws", "cis_1.5_aws"],
)
list_group.add_argument(
"--list-categories",
action="store_true",
help="List the available check's categories",
)
def __init_aws_parser__(self):
"""Init the AWS Provider CLI parser"""
aws_parser = self.subparsers.add_parser(
"aws", parents=[self.common_providers_parser], help="AWS Provider"
)
# Authentication Methods
aws_auth_subparser = aws_parser.add_argument_group("Authentication Modes")
aws_auth_subparser.add_argument(
"-p",
"--profile",
nargs="?",
default=None,
help="AWS profile to launch prowler with",
)
aws_auth_subparser.add_argument(
"-R",
"--role",
nargs="?",
default=None,
help="ARN of the role to be assumed",
)
aws_auth_subparser.add_argument(
"-T",
"--session-duration",
nargs="?",
default=3600,
type=int,
help="Assumed role session duration in seconds, must be between 900 and 43200. Default: 3600",
)
aws_auth_subparser.add_argument(
"-I",
"--external-id",
nargs="?",
default=None,
help="External ID to be passed when assuming role",
)
# AWS Regions
aws_regions_subparser = aws_parser.add_argument_group("AWS Regions")
aws_regions_subparser.add_argument(
"-f",
"--region",
"--filter-region",
nargs="+",
help="AWS region names to run Prowler against",
)
# AWS Organizations
aws_orgs_subparser = aws_parser.add_argument_group("Organizations")
aws_orgs_subparser.add_argument(
"-O",
"--organizations-role",
nargs="?",
help="Specify AWS Organizations management role ARN to be assumed, to get Organization metadata",
)
# AWS Security Hub
aws_security_hub_subparser = aws_parser.add_argument_group("Security Hub")
aws_security_hub_subparser.add_argument(
"-S",
"--security-hub",
action="store_true",
help="Send check output to AWS Security Hub",
)
# AWS Quick Inventory
aws_quick_inventory_subparser = aws_parser.add_argument_group("Quick Inventory")
aws_quick_inventory_subparser.add_argument(
"-i",
"--quick-inventory",
action="store_true",
help="Run Prowler Quick Inventory. The inventory will be stored in an output csv by default",
)
# AWS Outputs
aws_outputs_subparser = aws_parser.add_argument_group("AWS Outputs to S3")
aws_outputs_bucket_parser = aws_outputs_subparser.add_mutually_exclusive_group()
aws_outputs_bucket_parser.add_argument(
"-B",
"--output-bucket",
nargs="?",
default=None,
help="Custom output bucket, requires -M <mode> and it can work also with -o flag.",
)
aws_outputs_bucket_parser.add_argument(
"-D",
"--output-bucket-no-assume",
nargs="?",
default=None,
help="Same as -B but do not use the assumed role credentials to put objects to the bucket, instead uses the initial credentials.",
)
aws_3rd_party_subparser = aws_parser.add_argument_group(
"3rd Party Integrations"
)
aws_3rd_party_subparser.add_argument(
"-N",
"--shodan",
nargs="?",
default=None,
help="Shodan API key used by check ec2_elastic_ip_shodan.",
)
def __init_azure_parser__(self):
"""Init the Azure Provider CLI parser"""
azure_parser = self.subparsers.add_parser(
"azure", parents=[self.common_providers_parser], help="Azure Provider"
)
# Authentication Modes
azure_auth_subparser = azure_parser.add_argument_group("Authentication Modes")
azure_auth_modes_group = azure_auth_subparser.add_mutually_exclusive_group()
azure_auth_modes_group.add_argument(
"--az-cli-auth",
action="store_true",
help="Use Azure cli credentials to log in against azure",
)
azure_auth_modes_group.add_argument(
"--sp-env-auth",
action="store_true",
help="Use service principal env variables authentication to log in against azure",
)
azure_auth_modes_group.add_argument(
"--browser-auth",
action="store_true",
help="Use browser authentication to log in against azure ",
)
azure_auth_modes_group.add_argument(
"--managed-identity-auth",
action="store_true",
help="Use managed identity authentication to log in against azure ",
)
# Subscriptions
azure_subscriptions_subparser = azure_parser.add_argument_group("Subscriptions")
azure_subscriptions_subparser.add_argument(
"--subscription-ids",
nargs="+",
default=[],
help="Azure subscription ids to be scanned by prowler",
)

View File

@@ -21,7 +21,6 @@ from prowler.config.config import (
timestamp_iso,
timestamp_utc,
)
from prowler.lib.check.models import Output_From_Options
from prowler.lib.logger import logger
from prowler.lib.outputs.models import (
Check_Output_CSV,
@@ -38,6 +37,7 @@ from prowler.lib.utils.utils import file_exists, hash_sha512, open_file
from prowler.providers.aws.lib.allowlist.allowlist import is_allowlisted
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.lib.security_hub.security_hub import send_to_security_hub
from prowler.providers.common.outputs import Provider_Output_Options
def report(check_findings, output_options, audit_info):
@@ -512,7 +512,7 @@ def send_to_s3_bucket(
def display_summary_table(
findings: list,
audit_info,
output_options: Output_From_Options,
output_options: Provider_Output_Options,
provider: str,
):
output_directory = output_options.output_directory

View File

@@ -1,4 +1,3 @@
import importlib
import sys
from arnparse import arnparse
@@ -21,7 +20,7 @@ from prowler.providers.azure.lib.audit_info.models import Azure_Audit_Info
class Audit_Info:
def __init__(self):
logger.info("Instantiating audit info")
logger.info("Setting Audit Info ...")
def validate_credentials(self, validate_session: session) -> dict:
try:
@@ -44,11 +43,11 @@ class Audit_Info:
profile = audit_info.profile if audit_info.profile is not None else "default"
report = f"""
This report is being generated using credentials below:
This report is being generated using credentials below:
AWS-CLI Profile: {Fore.YELLOW}[{profile}]{Style.RESET_ALL} AWS Filter Region: {Fore.YELLOW}[{regions}]{Style.RESET_ALL}
AWS Account: {Fore.YELLOW}[{audit_info.audited_account}]{Style.RESET_ALL} UserId: {Fore.YELLOW}[{audit_info.audited_user_id}]{Style.RESET_ALL}
Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESET_ALL}
AWS-CLI Profile: {Fore.YELLOW}[{profile}]{Style.RESET_ALL} AWS Filter Region: {Fore.YELLOW}[{regions}]{Style.RESET_ALL}
AWS Account: {Fore.YELLOW}[{audit_info.audited_account}]{Style.RESET_ALL} UserId: {Fore.YELLOW}[{audit_info.audited_user_id}]{Style.RESET_ALL}
Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESET_ALL}
"""
# If -A is set, print Assumed Role ARN
if audit_info.assumed_role_info.role_arn is not None:
@@ -92,13 +91,30 @@ class Audit_Info:
)
return organizations_info
def set_aws_audit_info(self, arguments):
input_profile = arguments["profile"]
input_role = arguments["role"]
input_session_duration = arguments["session_duration"]
input_external_id = arguments["external_id"]
input_regions = arguments["regions"]
organizations_role_arn = arguments["organizations_role"]
def set_aws_audit_info(self, arguments) -> AWS_Audit_Info:
"""
set_aws_audit_info returns the AWS_Audit_Info
"""
logger.info("Setting Azure session ...")
# Assume Role Options
input_role = arguments.get("role")
input_session_duration = arguments.get("session_duration")
input_external_id = arguments.get("external_id")
print(input_session_duration)
if input_session_duration and input_session_duration not in range(900, 43200):
raise Exception("Value for -T option must be between 900 and 43200")
if (
input_session_duration and input_session_duration != 3600
) or input_external_id:
if not input_role:
raise Exception("To use -I/-T options -R option is needed")
input_profile = arguments.get("profile")
input_regions = arguments.get("regions")
organizations_role_arn = arguments.get("organizations_role")
# Assumed AWS session
assumed_session = None
@@ -218,12 +234,26 @@ class Audit_Info:
return current_audit_info
def set_azure_audit_info(self, arguments) -> Azure_Audit_Info:
"""
set_azure_audit_info returns the Azure_Audit_Info
"""
logger.info("Setting Azure session ...")
subscription_ids = arguments["subscriptions"]
az_cli_auth = arguments["az_cli_auth"]
sp_env_auth = arguments["sp_env_auth"]
browser_auth = arguments["browser_auth"]
managed_entity_auth = arguments["managed_entity_auth"]
subscription_ids = arguments.get("subscriptions")
logger.info("Checking if any credentials mode is set ...")
az_cli_auth = arguments.get("az_cli_auth")
sp_env_auth = arguments.get("sp_env_auth")
browser_auth = arguments.get("browser_auth")
managed_entity_auth = arguments.get("managed_entity_auth")
if (
not az_cli_auth
and not sp_env_auth
and not browser_auth
and not managed_entity_auth
):
raise Exception(
"Azure provider requires at least one authentication method set: [--az-cli-auth | --sp-env-auth | --browser-auth | --managed-identity-auth]"
)
azure_provider = Azure_Provider(
az_cli_auth,
@@ -239,17 +269,16 @@ class Audit_Info:
def set_provider_audit_info(provider: str, arguments: dict):
"""
set_provider_audit_info configures automatically the audit session based on the selected provider and returns the audit_info object.
"""
try:
provider_set_audit_info = f"set_{provider}_audit_info"
provider_audit_info = getattr(Audit_Info(), provider_set_audit_info)(arguments)
except Exception as error:
logger.error(
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit()
else:
return provider_audit_info
def import_lib(path: str):
lib = importlib.import_module(path)
return lib

View File

@@ -0,0 +1,93 @@
import importlib
import sys
from dataclasses import dataclass
from os import mkdir
from os.path import isdir
from prowler.config.config import change_config_var, output_file_timestamp
from prowler.lib.logger import logger
def set_provider_output_options(
provider: str, arguments, audit_info, allowlist_file, bulk_checks_metadata
):
"""
set_provider_output_options configures automatically the outputs based on the selected provider and returns the Provider_Output_Options object.
"""
try:
# Dynamically load the Provider_Output_Options class
provider_output_class = f"{provider.capitalize()}_Output_Options"
provider_output_options = getattr(
importlib.import_module(__name__), provider_output_class
)(arguments, audit_info, allowlist_file, bulk_checks_metadata)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit()
else:
return provider_output_options
@dataclass
class Provider_Output_Options:
is_quiet: bool
output_modes: list
output_directory: str
allowlist_file: str
bulk_checks_metadata: dict
verbose: str
output_filename: str
def __init__(self, arguments, allowlist_file, bulk_checks_metadata):
self.is_quiet = arguments.quiet
self.output_modes = arguments.output_modes
self.output_directory = arguments.output_directory
self.verbose = arguments.verbose
self.bulk_checks_metadata = bulk_checks_metadata
self.allowlist_file = allowlist_file
# Check output directory, if it is not created -> create it
if arguments.output_directory:
if not isdir(arguments.output_directory):
if arguments.output_modes:
mkdir(arguments.output_directory)
class Azure_Output_Options(Provider_Output_Options):
def __init__(self, arguments, audit_info, allowlist_file, bulk_checks_metadata):
# First call Provider_Output_Options init
super().__init__(arguments, allowlist_file, bulk_checks_metadata)
# Check if custom output filename was input, if not, set the default
if not arguments.output_filename:
if audit_info.identity.domain:
self.output_filename = f"prowler-output-{audit_info.identity.domain}-{output_file_timestamp}"
else:
self.output_filename = f"prowler-output-{'-'.join(audit_info.identity.tenant_ids)}-{output_file_timestamp}"
class Aws_Output_Options(Provider_Output_Options):
security_hub_enabled: bool
def __init__(self, arguments, audit_info, allowlist_file, bulk_checks_metadata):
# First call Provider_Output_Options init
super().__init__(arguments, allowlist_file, bulk_checks_metadata)
# Confire Shodan API
if arguments.shodan:
change_config_var("shodan_api_key", arguments.shodan)
# Check if custom output filename was input, if not, set the default
if not arguments.output_filename:
self.output_filename = (
f"prowler-output-{audit_info.audited_account}-{output_file_timestamp}"
)
# Security Hub Outputs
self.security_hub_enabled = arguments.security_hub
if arguments.security_hub:
if not self.output_modes:
self.output_modes = ["json-asff"]
else:
self.output_modes.append("json-asff")

View File

@@ -11,7 +11,7 @@ from prowler.providers.azure.lib.audit_info.models import (
Azure_Audit_Info,
Azure_Identity_Info,
)
from prowler.providers.common.common import Audit_Info, set_provider_audit_info
from prowler.providers.common.audit_info import Audit_Info, set_provider_audit_info
ACCOUNT_ID = 123456789012
mock_current_audit_info = AWS_Audit_Info(
@@ -59,7 +59,7 @@ def mock_set_credentials(*_):
class Test_Set_Audit_Info:
@patch(
"prowler.providers.common.common.current_audit_info",
"prowler.providers.common.audit_info.current_audit_info",
new=mock_current_audit_info,
)
@mock_sts
@@ -88,7 +88,7 @@ class Test_Set_Audit_Info:
# assert get_caller_identity["UserId"] == str(ACCOUNT_ID)
@patch(
"prowler.providers.common.common.current_audit_info",
"prowler.providers.common.audit_info.current_audit_info",
new=mock_current_audit_info,
)
@mock_organizations
@@ -147,7 +147,7 @@ class Test_Set_Audit_Info:
org.account_details_tags.should.equal("key:value,")
@patch(
"prowler.providers.common.common.current_audit_info",
"prowler.providers.common.audit_info.current_audit_info",
new=mock_current_audit_info,
)
@patch.object(Audit_Info, "validate_credentials", new=mock_validate_credentials)
@@ -174,7 +174,8 @@ class Test_Set_Audit_Info:
assert isinstance(audit_info, AWS_Audit_Info)
@patch(
"prowler.providers.common.common.azure_audit_info", new=mock_azure_audit_info
"prowler.providers.common.audit_info.azure_audit_info",
new=mock_azure_audit_info,
)
@patch.object(Azure_Provider, "__set_credentials__", new=mock_set_credentials)
@patch.object(Azure_Provider, "__set_identity_info__", new=mock_set_identity_info)
@@ -188,7 +189,8 @@ class Test_Set_Audit_Info:
"regions": None,
"organizations_role": None,
"subscriptions": None,
"az_cli_auth": None,
# We need to set exactly one auth method
"az_cli_auth": True,
"sp_env_auth": None,
"browser_auth": None,
"managed_entity_auth": None,