mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
chore(parser): Move provider logic to their folder (#2746)
This commit is contained in:
@@ -8,8 +8,10 @@ from prowler.config.config import (
|
||||
default_config_file_path,
|
||||
default_output_directory,
|
||||
)
|
||||
from prowler.providers.aws.aws_provider import get_aws_available_regions
|
||||
from prowler.providers.aws.lib.arn.arn import arn_type
|
||||
from prowler.providers.common.arguments import (
|
||||
init_providers_parser,
|
||||
validate_provider_arguments,
|
||||
)
|
||||
|
||||
|
||||
class ProwlerArgumentParser:
|
||||
@@ -49,9 +51,7 @@ Detailed documentation at https://docs.prowler.cloud
|
||||
self.__init_config_parser__()
|
||||
|
||||
# Init Providers Arguments
|
||||
self.__init_aws_parser__()
|
||||
self.__init_azure_parser__()
|
||||
self.__init_gcp_parser__()
|
||||
init_providers_parser(self)
|
||||
|
||||
def parse(self, args=None) -> argparse.Namespace:
|
||||
"""
|
||||
@@ -93,6 +93,11 @@ Detailed documentation at https://docs.prowler.cloud
|
||||
if args.only_logs or args.list_checks_json:
|
||||
args.no_banner = True
|
||||
|
||||
# Extra validation for provider arguments
|
||||
valid, message = validate_provider_arguments(args)
|
||||
if not valid:
|
||||
self.parser.error(f"{args.provider}: {message}")
|
||||
|
||||
return args
|
||||
|
||||
def __set_default_provider__(self, args: list) -> list:
|
||||
@@ -193,7 +198,7 @@ Detailed documentation at https://docs.prowler.cloud
|
||||
def __init_checks_parser__(self):
|
||||
# Set checks to execute
|
||||
common_checks_parser = self.common_providers_parser.add_argument_group(
|
||||
"Specify checks/services to run arguments"
|
||||
"Specify checks/services to run"
|
||||
)
|
||||
# The following arguments needs to be set exclusivelly
|
||||
group = common_checks_parser.add_mutually_exclusive_group()
|
||||
@@ -275,222 +280,3 @@ Detailed documentation at https://docs.prowler.cloud
|
||||
default=default_config_file_path,
|
||||
help="Set configuration file path",
|
||||
)
|
||||
|
||||
def __init_aws_parser__(self):
|
||||
"""Init the AWS Provider CLI parser"""
|
||||
aws_parser = self.subparsers.add_parser(
|
||||
"aws", parents=[self.common_providers_parser], help="AWS Provider"
|
||||
)
|
||||
# Authentication Methods
|
||||
aws_auth_subparser = aws_parser.add_argument_group("Authentication Modes")
|
||||
aws_auth_subparser.add_argument(
|
||||
"-p",
|
||||
"--profile",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="AWS profile to launch prowler with",
|
||||
)
|
||||
aws_auth_subparser.add_argument(
|
||||
"-R",
|
||||
"--role",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="ARN of the role to be assumed",
|
||||
# Pending ARN validation
|
||||
)
|
||||
aws_auth_subparser.add_argument(
|
||||
"--sts-endpoint-region",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Specify the AWS STS endpoint region to use. Read more at https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_enable-regions.html",
|
||||
)
|
||||
aws_auth_subparser.add_argument(
|
||||
"--mfa",
|
||||
action="store_true",
|
||||
help="IAM entity enforces MFA so you need to input the MFA ARN and the TOTP",
|
||||
)
|
||||
aws_auth_subparser.add_argument(
|
||||
"-T",
|
||||
"--session-duration",
|
||||
nargs="?",
|
||||
default=3600,
|
||||
type=int,
|
||||
help="Assumed role session duration in seconds, must be between 900 and 43200. Default: 3600",
|
||||
# Pending session duration validation
|
||||
)
|
||||
aws_auth_subparser.add_argument(
|
||||
"-I",
|
||||
"--external-id",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="External ID to be passed when assuming role",
|
||||
)
|
||||
# AWS Regions
|
||||
aws_regions_subparser = aws_parser.add_argument_group("AWS Regions")
|
||||
aws_regions_subparser.add_argument(
|
||||
"-f",
|
||||
"--region",
|
||||
"--filter-region",
|
||||
nargs="+",
|
||||
help="AWS region names to run Prowler against",
|
||||
choices=get_aws_available_regions(),
|
||||
)
|
||||
# AWS Organizations
|
||||
aws_orgs_subparser = aws_parser.add_argument_group("AWS Organizations")
|
||||
aws_orgs_subparser.add_argument(
|
||||
"-O",
|
||||
"--organizations-role",
|
||||
nargs="?",
|
||||
help="Specify AWS Organizations management role ARN to be assumed, to get Organization metadata",
|
||||
)
|
||||
# AWS Security Hub
|
||||
aws_security_hub_subparser = aws_parser.add_argument_group("AWS Security Hub")
|
||||
aws_security_hub_subparser.add_argument(
|
||||
"-S",
|
||||
"--security-hub",
|
||||
action="store_true",
|
||||
help="Send check output to AWS Security Hub",
|
||||
)
|
||||
aws_security_hub_subparser.add_argument(
|
||||
"--skip-sh-update",
|
||||
action="store_true",
|
||||
help="Skip updating previous findings of Prowler in Security Hub",
|
||||
)
|
||||
# AWS Quick Inventory
|
||||
aws_quick_inventory_subparser = aws_parser.add_argument_group("Quick Inventory")
|
||||
aws_quick_inventory_subparser.add_argument(
|
||||
"-i",
|
||||
"--quick-inventory",
|
||||
action="store_true",
|
||||
help="Run Prowler Quick Inventory. The inventory will be stored in an output csv by default",
|
||||
)
|
||||
# AWS Outputs
|
||||
aws_outputs_subparser = aws_parser.add_argument_group("AWS Outputs to S3")
|
||||
aws_outputs_bucket_parser = aws_outputs_subparser.add_mutually_exclusive_group()
|
||||
aws_outputs_bucket_parser.add_argument(
|
||||
"-B",
|
||||
"--output-bucket",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Custom output bucket, requires -M <mode> and it can work also with -o flag.",
|
||||
)
|
||||
aws_outputs_bucket_parser.add_argument(
|
||||
"-D",
|
||||
"--output-bucket-no-assume",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Same as -B but do not use the assumed role credentials to put objects to the bucket, instead uses the initial credentials.",
|
||||
)
|
||||
aws_3rd_party_subparser = aws_parser.add_argument_group(
|
||||
"3rd Party Integrations"
|
||||
)
|
||||
aws_3rd_party_subparser.add_argument(
|
||||
"-N",
|
||||
"--shodan",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Shodan API key used by check ec2_elastic_ip_shodan.",
|
||||
)
|
||||
# Allowlist
|
||||
allowlist_subparser = aws_parser.add_argument_group("Allowlist")
|
||||
allowlist_subparser.add_argument(
|
||||
"-w",
|
||||
"--allowlist-file",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Path for allowlist yaml file. See example prowler/config/allowlist.yaml for reference and format. It also accepts AWS DynamoDB Table or Lambda ARNs or S3 URIs, see more in https://docs.prowler.cloud/en/latest/tutorials/allowlist/",
|
||||
)
|
||||
# Based Scans
|
||||
aws_based_scans_subparser = aws_parser.add_argument_group("AWS Based Scans")
|
||||
aws_based_scans_parser = (
|
||||
aws_based_scans_subparser.add_mutually_exclusive_group()
|
||||
)
|
||||
aws_based_scans_parser.add_argument(
|
||||
"--resource-tags",
|
||||
nargs="+",
|
||||
default=None,
|
||||
help="Scan only resources with specific AWS Tags (Key=Value), e.g., Environment=dev Project=prowler",
|
||||
)
|
||||
aws_based_scans_parser.add_argument(
|
||||
"--resource-arn",
|
||||
nargs="+",
|
||||
type=arn_type,
|
||||
default=None,
|
||||
help="Scan only resources with specific AWS Resource ARNs, e.g., arn:aws:iam::012345678910:user/test arn:aws:ec2:us-east-1:123456789012:vpc/vpc-12345678",
|
||||
)
|
||||
|
||||
# Boto3 Config
|
||||
boto3_config_subparser = aws_parser.add_argument_group("Boto3 Config")
|
||||
boto3_config_subparser.add_argument(
|
||||
"--aws-retries-max-attempts",
|
||||
nargs="?",
|
||||
default=None,
|
||||
type=int,
|
||||
help="Set the maximum attemps for the Boto3 standard retrier config (Default: 3)",
|
||||
)
|
||||
|
||||
def __init_azure_parser__(self):
|
||||
"""Init the Azure Provider CLI parser"""
|
||||
azure_parser = self.subparsers.add_parser(
|
||||
"azure", parents=[self.common_providers_parser], help="Azure Provider"
|
||||
)
|
||||
# Authentication Modes
|
||||
azure_auth_subparser = azure_parser.add_argument_group("Authentication Modes")
|
||||
azure_auth_modes_group = azure_auth_subparser.add_mutually_exclusive_group()
|
||||
azure_auth_modes_group.add_argument(
|
||||
"--az-cli-auth",
|
||||
action="store_true",
|
||||
help="Use Azure cli credentials to log in against azure",
|
||||
)
|
||||
azure_auth_modes_group.add_argument(
|
||||
"--sp-env-auth",
|
||||
action="store_true",
|
||||
help="Use service principal env variables authentication to log in against azure",
|
||||
)
|
||||
azure_auth_modes_group.add_argument(
|
||||
"--browser-auth",
|
||||
action="store_true",
|
||||
help="Use browser authentication to log in against Azure, --tenant-id is required for this option",
|
||||
)
|
||||
azure_auth_modes_group.add_argument(
|
||||
"--managed-identity-auth",
|
||||
action="store_true",
|
||||
help="Use managed identity authentication to log in against azure ",
|
||||
)
|
||||
# Subscriptions
|
||||
azure_subscriptions_subparser = azure_parser.add_argument_group("Subscriptions")
|
||||
azure_subscriptions_subparser.add_argument(
|
||||
"--subscription-ids",
|
||||
nargs="+",
|
||||
default=[],
|
||||
help="Azure Subscription IDs to be scanned by Prowler",
|
||||
)
|
||||
azure_parser.add_argument(
|
||||
"--tenant-id",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Azure Tenant ID to be used with --browser-auth option",
|
||||
)
|
||||
|
||||
def __init_gcp_parser__(self):
|
||||
"""Init the GCP Provider CLI parser"""
|
||||
gcp_parser = self.subparsers.add_parser(
|
||||
"gcp", parents=[self.common_providers_parser], help="GCP Provider"
|
||||
)
|
||||
# Authentication Modes
|
||||
gcp_auth_subparser = gcp_parser.add_argument_group("Authentication Modes")
|
||||
gcp_auth_modes_group = gcp_auth_subparser.add_mutually_exclusive_group()
|
||||
gcp_auth_modes_group.add_argument(
|
||||
"--credentials-file",
|
||||
nargs="?",
|
||||
metavar="FILE_PATH",
|
||||
help="Authenticate using a Google Service Account Application Credentials JSON file",
|
||||
)
|
||||
# Subscriptions
|
||||
gcp_subscriptions_subparser = gcp_parser.add_argument_group("Projects")
|
||||
gcp_subscriptions_subparser.add_argument(
|
||||
"--project-ids",
|
||||
nargs="+",
|
||||
default=[],
|
||||
help="GCP Project IDs to be scanned by Prowler",
|
||||
)
|
||||
|
||||
0
prowler/providers/aws/lib/arguments/__init__.py
Normal file
0
prowler/providers/aws/lib/arguments/__init__.py
Normal file
176
prowler/providers/aws/lib/arguments/arguments.py
Normal file
176
prowler/providers/aws/lib/arguments/arguments.py
Normal file
@@ -0,0 +1,176 @@
|
||||
from argparse import ArgumentTypeError, Namespace
|
||||
|
||||
from prowler.providers.aws.aws_provider import get_aws_available_regions
|
||||
from prowler.providers.aws.lib.arn.arn import arn_type
|
||||
|
||||
|
||||
def init_parser(self):
|
||||
"""Init the AWS Provider CLI parser"""
|
||||
aws_parser = self.subparsers.add_parser(
|
||||
"aws", parents=[self.common_providers_parser], help="AWS Provider"
|
||||
)
|
||||
# Authentication Methods
|
||||
aws_auth_subparser = aws_parser.add_argument_group("Authentication Modes")
|
||||
aws_auth_subparser.add_argument(
|
||||
"-p",
|
||||
"--profile",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="AWS profile to launch prowler with",
|
||||
)
|
||||
aws_auth_subparser.add_argument(
|
||||
"-R",
|
||||
"--role",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="ARN of the role to be assumed",
|
||||
# Pending ARN validation
|
||||
)
|
||||
aws_auth_subparser.add_argument(
|
||||
"--sts-endpoint-region",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Specify the AWS STS endpoint region to use. Read more at https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_enable-regions.html",
|
||||
)
|
||||
aws_auth_subparser.add_argument(
|
||||
"--mfa",
|
||||
action="store_true",
|
||||
help="IAM entity enforces MFA so you need to input the MFA ARN and the TOTP",
|
||||
)
|
||||
aws_auth_subparser.add_argument(
|
||||
"-T",
|
||||
"--session-duration",
|
||||
nargs="?",
|
||||
default=3600,
|
||||
type=validate_session_duration,
|
||||
help="Assumed role session duration in seconds, must be between 900 and 43200. Default: 3600",
|
||||
# Pending session duration validation
|
||||
)
|
||||
aws_auth_subparser.add_argument(
|
||||
"-I",
|
||||
"--external-id",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="External ID to be passed when assuming role",
|
||||
)
|
||||
# AWS Regions
|
||||
aws_regions_subparser = aws_parser.add_argument_group("AWS Regions")
|
||||
aws_regions_subparser.add_argument(
|
||||
"-f",
|
||||
"--region",
|
||||
"--filter-region",
|
||||
nargs="+",
|
||||
help="AWS region names to run Prowler against",
|
||||
choices=get_aws_available_regions(),
|
||||
)
|
||||
# AWS Organizations
|
||||
aws_orgs_subparser = aws_parser.add_argument_group("AWS Organizations")
|
||||
aws_orgs_subparser.add_argument(
|
||||
"-O",
|
||||
"--organizations-role",
|
||||
nargs="?",
|
||||
help="Specify AWS Organizations management role ARN to be assumed, to get Organization metadata",
|
||||
)
|
||||
# AWS Security Hub
|
||||
aws_security_hub_subparser = aws_parser.add_argument_group("AWS Security Hub")
|
||||
aws_security_hub_subparser.add_argument(
|
||||
"-S",
|
||||
"--security-hub",
|
||||
action="store_true",
|
||||
help="Send check output to AWS Security Hub",
|
||||
)
|
||||
aws_security_hub_subparser.add_argument(
|
||||
"--skip-sh-update",
|
||||
action="store_true",
|
||||
help="Skip updating previous findings of Prowler in Security Hub",
|
||||
)
|
||||
# AWS Quick Inventory
|
||||
aws_quick_inventory_subparser = aws_parser.add_argument_group("Quick Inventory")
|
||||
aws_quick_inventory_subparser.add_argument(
|
||||
"-i",
|
||||
"--quick-inventory",
|
||||
action="store_true",
|
||||
help="Run Prowler Quick Inventory. The inventory will be stored in an output csv by default",
|
||||
)
|
||||
# AWS Outputs
|
||||
aws_outputs_subparser = aws_parser.add_argument_group("AWS Outputs to S3")
|
||||
aws_outputs_bucket_parser = aws_outputs_subparser.add_mutually_exclusive_group()
|
||||
aws_outputs_bucket_parser.add_argument(
|
||||
"-B",
|
||||
"--output-bucket",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Custom output bucket, requires -M <mode> and it can work also with -o flag.",
|
||||
)
|
||||
aws_outputs_bucket_parser.add_argument(
|
||||
"-D",
|
||||
"--output-bucket-no-assume",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Same as -B but do not use the assumed role credentials to put objects to the bucket, instead uses the initial credentials.",
|
||||
)
|
||||
aws_3rd_party_subparser = aws_parser.add_argument_group("3rd Party Integrations")
|
||||
aws_3rd_party_subparser.add_argument(
|
||||
"-N",
|
||||
"--shodan",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Shodan API key used by check ec2_elastic_ip_shodan.",
|
||||
)
|
||||
# Allowlist
|
||||
allowlist_subparser = aws_parser.add_argument_group("Allowlist")
|
||||
allowlist_subparser.add_argument(
|
||||
"-w",
|
||||
"--allowlist-file",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Path for allowlist yaml file. See example prowler/config/allowlist.yaml for reference and format. It also accepts AWS DynamoDB Table or Lambda ARNs or S3 URIs, see more in https://docs.prowler.cloud/en/latest/tutorials/allowlist/",
|
||||
)
|
||||
# Based Scans
|
||||
aws_based_scans_subparser = aws_parser.add_argument_group("AWS Based Scans")
|
||||
aws_based_scans_parser = aws_based_scans_subparser.add_mutually_exclusive_group()
|
||||
aws_based_scans_parser.add_argument(
|
||||
"--resource-tags",
|
||||
nargs="+",
|
||||
default=None,
|
||||
help="Scan only resources with specific AWS Tags (Key=Value), e.g., Environment=dev Project=prowler",
|
||||
)
|
||||
aws_based_scans_parser.add_argument(
|
||||
"--resource-arn",
|
||||
nargs="+",
|
||||
type=arn_type,
|
||||
default=None,
|
||||
help="Scan only resources with specific AWS Resource ARNs, e.g., arn:aws:iam::012345678910:user/test arn:aws:ec2:us-east-1:123456789012:vpc/vpc-12345678",
|
||||
)
|
||||
|
||||
# Boto3 Config
|
||||
boto3_config_subparser = aws_parser.add_argument_group("Boto3 Config")
|
||||
boto3_config_subparser.add_argument(
|
||||
"--aws-retries-max-attempts",
|
||||
nargs="?",
|
||||
default=None,
|
||||
type=int,
|
||||
help="Set the maximum attemps for the Boto3 standard retrier config (Default: 3)",
|
||||
)
|
||||
|
||||
|
||||
def validate_session_duration(duration):
|
||||
"""validate_session_duration validates that the AWS STS Assume Role Session Duration is between 900 and 43200 seconds."""
|
||||
duration = int(duration)
|
||||
# Since the range(i,j) goes from i to j-1 we have to j+1
|
||||
if duration not in range(900, 43201):
|
||||
raise ArgumentTypeError("Session duration must be between 900 and 43200")
|
||||
return duration
|
||||
|
||||
|
||||
def validate_arguments(arguments: Namespace) -> tuple[bool, str]:
|
||||
"""validate_arguments returns {True, "} if the provider arguments passed are valid and can be used together. It performs an extra validation, specific for the AWS provider, apart from the argparse lib."""
|
||||
|
||||
# Handle if session_duration is not the default value or external_id is set
|
||||
if (
|
||||
arguments.session_duration and arguments.session_duration != 3600
|
||||
) or arguments.external_id:
|
||||
if not arguments.role:
|
||||
return (False, "To use -I/-T options -R option is needed")
|
||||
|
||||
return (True, "")
|
||||
@@ -1,5 +1,5 @@
|
||||
import re
|
||||
from argparse import ArgumentError
|
||||
from argparse import ArgumentTypeError
|
||||
|
||||
from prowler.providers.aws.lib.arn.error import (
|
||||
RoleArnParsingEmptyResource,
|
||||
@@ -15,7 +15,7 @@ from prowler.providers.aws.lib.arn.models import ARN
|
||||
def arn_type(arn: str) -> bool:
|
||||
"""arn_type returns a string ARN if it is valid and raises an argparse.ArgumentError if not."""
|
||||
if not is_valid_arn(arn):
|
||||
raise ArgumentError("Invalid ARN")
|
||||
raise ArgumentTypeError(f"Invalid ARN {arn}")
|
||||
return arn
|
||||
|
||||
|
||||
|
||||
0
prowler/providers/azure/lib/arguments/__init__.py
Normal file
0
prowler/providers/azure/lib/arguments/__init__.py
Normal file
42
prowler/providers/azure/lib/arguments/arguments.py
Normal file
42
prowler/providers/azure/lib/arguments/arguments.py
Normal file
@@ -0,0 +1,42 @@
|
||||
def init_parser(self):
|
||||
"""Init the Azure Provider CLI parser"""
|
||||
azure_parser = self.subparsers.add_parser(
|
||||
"azure", parents=[self.common_providers_parser], help="Azure Provider"
|
||||
)
|
||||
# Authentication Modes
|
||||
azure_auth_subparser = azure_parser.add_argument_group("Authentication Modes")
|
||||
azure_auth_modes_group = azure_auth_subparser.add_mutually_exclusive_group()
|
||||
azure_auth_modes_group.add_argument(
|
||||
"--az-cli-auth",
|
||||
action="store_true",
|
||||
help="Use Azure cli credentials to log in against azure",
|
||||
)
|
||||
azure_auth_modes_group.add_argument(
|
||||
"--sp-env-auth",
|
||||
action="store_true",
|
||||
help="Use service principal env variables authentication to log in against azure",
|
||||
)
|
||||
azure_auth_modes_group.add_argument(
|
||||
"--browser-auth",
|
||||
action="store_true",
|
||||
help="Use browser authentication to log in against Azure, --tenant-id is required for this option",
|
||||
)
|
||||
azure_auth_modes_group.add_argument(
|
||||
"--managed-identity-auth",
|
||||
action="store_true",
|
||||
help="Use managed identity authentication to log in against azure ",
|
||||
)
|
||||
# Subscriptions
|
||||
azure_subscriptions_subparser = azure_parser.add_argument_group("Subscriptions")
|
||||
azure_subscriptions_subparser.add_argument(
|
||||
"--subscription-ids",
|
||||
nargs="+",
|
||||
default=[],
|
||||
help="Azure Subscription IDs to be scanned by Prowler",
|
||||
)
|
||||
azure_parser.add_argument(
|
||||
"--tenant-id",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Azure Tenant ID to be used with --browser-auth option",
|
||||
)
|
||||
58
prowler/providers/common/arguments.py
Normal file
58
prowler/providers/common/arguments.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import sys
|
||||
from argparse import Namespace
|
||||
from importlib import import_module
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.common.common import (
|
||||
get_available_providers,
|
||||
providers_prowler_lib_path,
|
||||
)
|
||||
|
||||
provider_arguments_lib_path = "lib.arguments.arguments"
|
||||
validate_provider_arguments_function = "validate_arguments"
|
||||
init_provider_arguments_function = "init_parser"
|
||||
|
||||
|
||||
def init_providers_parser(self):
|
||||
"""init_providers_parser calls the provider init_parser function to load all the arguments and flags. Receives a ProwlerArgumentParser object"""
|
||||
# We need to call the arguments parser for each provider
|
||||
providers = get_available_providers()
|
||||
for provider in providers:
|
||||
try:
|
||||
getattr(
|
||||
import_module(
|
||||
f"{providers_prowler_lib_path}.{provider}.{provider_arguments_lib_path}"
|
||||
),
|
||||
init_provider_arguments_function,
|
||||
)(self)
|
||||
except Exception as error:
|
||||
logger.critical(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def validate_provider_arguments(arguments: Namespace) -> tuple[bool, str]:
|
||||
"""validate_provider_arguments returns {True, "} if the provider arguments passed are valid and can be used together"""
|
||||
try:
|
||||
# Provider function must be located at prowler.providers.<provider>.lib.arguments.arguments.validate_arguments
|
||||
return getattr(
|
||||
import_module(
|
||||
f"{providers_prowler_lib_path}.{arguments.provider}.{provider_arguments_lib_path}"
|
||||
),
|
||||
validate_provider_arguments_function,
|
||||
)(arguments)
|
||||
|
||||
# If the provider does not have a lib.arguments package we return (True, "")
|
||||
except ModuleNotFoundError:
|
||||
return (True, "")
|
||||
|
||||
# If the provider does not have a validate_arguments we return (True, "")
|
||||
except AttributeError:
|
||||
return (True, "")
|
||||
|
||||
except Exception as error:
|
||||
logger.critical(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
sys.exit(1)
|
||||
@@ -84,17 +84,6 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE
|
||||
# STS Endpoint Region
|
||||
sts_endpoint_region = arguments.get("sts_endpoint_region")
|
||||
|
||||
# Since the range(i,j) goes from i to j-1 we have to j+1
|
||||
if input_session_duration and input_session_duration not in range(900, 43201):
|
||||
raise Exception("Value for -T option must be between 900 and 43200")
|
||||
|
||||
# Handle if session_duration is not the default value or external_id is set
|
||||
if (
|
||||
input_session_duration and input_session_duration != 3600
|
||||
) or input_external_id:
|
||||
if not input_role:
|
||||
raise Exception("To use -I/-T options -R option is needed")
|
||||
|
||||
# MFA Configuration (false by default)
|
||||
input_mfa = arguments.get("mfa")
|
||||
current_audit_info.mfa_enabled = input_mfa
|
||||
|
||||
14
prowler/providers/common/common.py
Normal file
14
prowler/providers/common/common.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from importlib import import_module
|
||||
|
||||
providers_prowler_lib_path = "prowler.providers"
|
||||
|
||||
|
||||
def get_available_providers() -> list[str]:
|
||||
"""get_available_providers returns a list of the available providers"""
|
||||
providers_list = import_module(f"{providers_prowler_lib_path}")
|
||||
providers = [
|
||||
provider
|
||||
for provider in providers_list.__dict__
|
||||
if not (provider.startswith("__") or provider.startswith("common"))
|
||||
]
|
||||
return providers
|
||||
0
prowler/providers/gcp/lib/arguments/__init__.py
Normal file
0
prowler/providers/gcp/lib/arguments/__init__.py
Normal file
22
prowler/providers/gcp/lib/arguments/arguments.py
Normal file
22
prowler/providers/gcp/lib/arguments/arguments.py
Normal file
@@ -0,0 +1,22 @@
|
||||
def init_parser(self):
|
||||
"""Init the GCP Provider CLI parser"""
|
||||
gcp_parser = self.subparsers.add_parser(
|
||||
"gcp", parents=[self.common_providers_parser], help="GCP Provider"
|
||||
)
|
||||
# Authentication Modes
|
||||
gcp_auth_subparser = gcp_parser.add_argument_group("Authentication Modes")
|
||||
gcp_auth_modes_group = gcp_auth_subparser.add_mutually_exclusive_group()
|
||||
gcp_auth_modes_group.add_argument(
|
||||
"--credentials-file",
|
||||
nargs="?",
|
||||
metavar="FILE_PATH",
|
||||
help="Authenticate using a Google Service Account Application Credentials JSON file",
|
||||
)
|
||||
# Subscriptions
|
||||
gcp_subscriptions_subparser = gcp_parser.add_argument_group("Projects")
|
||||
gcp_subscriptions_subparser.add_argument(
|
||||
"--project-ids",
|
||||
nargs="+",
|
||||
default=[],
|
||||
help="GCP Project IDs to be scanned by Prowler",
|
||||
)
|
||||
@@ -1,15 +1,32 @@
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from mock import patch
|
||||
|
||||
from prowler.lib.cli.parser import ProwlerArgumentParser
|
||||
|
||||
prowler_command = "prowler"
|
||||
|
||||
# capsys
|
||||
# https://docs.pytest.org/en/7.1.x/how-to/capture-stdout-stderr.html
|
||||
prowler_default_usage_error = "usage: prowler [-h] [-v] {aws,azure,gcp} ..."
|
||||
|
||||
|
||||
def mock_get_available_providers():
|
||||
return ["aws", "azure", "gcp"]
|
||||
|
||||
|
||||
class Test_Parser:
|
||||
# Init parser
|
||||
def setup_method(self):
|
||||
# We need this to mock the get_available_providers function call
|
||||
# since the importlib.import_module is not working starting from the test class
|
||||
self.patch_get_available_providers = patch(
|
||||
"prowler.providers.common.arguments.get_available_providers",
|
||||
new=mock_get_available_providers,
|
||||
)
|
||||
self.patch_get_available_providers.start()
|
||||
|
||||
# Init parser
|
||||
self.parser = ProwlerArgumentParser()
|
||||
|
||||
def test_default_parser_no_arguments_aws(self):
|
||||
@@ -526,7 +543,7 @@ class Test_Parser:
|
||||
assert severity_1 in parsed.severity
|
||||
assert severity_2 in parsed.severity
|
||||
|
||||
def test_checks_parser_wrong_severity(self):
|
||||
def test_checks_parser_wrong_severity(self, capsys):
|
||||
argument = "--severity"
|
||||
severity = "kk"
|
||||
command = [prowler_command, argument, severity]
|
||||
@@ -689,22 +706,33 @@ class Test_Parser:
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.mfa
|
||||
|
||||
def test_aws_parser_session_duration_short(self):
|
||||
def test_aws_parser_session_duration_short(self, capsys):
|
||||
argument = "-T"
|
||||
duration = "900"
|
||||
command = [prowler_command, argument, duration]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.session_duration == int(duration)
|
||||
with pytest.raises(SystemExit) as wrapped_exit:
|
||||
_ = self.parser.parse(command)
|
||||
assert wrapped_exit.type == SystemExit
|
||||
assert wrapped_exit.value.code == 2
|
||||
assert (
|
||||
capsys.readouterr().err
|
||||
== f"{prowler_default_usage_error}\nprowler: error: aws: To use -I/-T options -R option is needed\n"
|
||||
)
|
||||
|
||||
def test_aws_parser_session_duration_long(self):
|
||||
def test_aws_parser_session_duration_long(self, capsys):
|
||||
argument = "--session-duration"
|
||||
duration = "900"
|
||||
command = [prowler_command, argument, duration]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.session_duration == int(duration)
|
||||
|
||||
# Pending Session Duration validation during parse to test input out of range
|
||||
with pytest.raises(SystemExit) as wrapped_exit:
|
||||
_ = self.parser.parse(command)
|
||||
assert wrapped_exit.type == SystemExit
|
||||
assert wrapped_exit.value.code == 2
|
||||
assert (
|
||||
capsys.readouterr().err
|
||||
== f"{prowler_default_usage_error}\nprowler: error: aws: To use -I/-T options -R option is needed\n"
|
||||
)
|
||||
|
||||
# TODO
|
||||
def test_aws_parser_external_id_no_short(self):
|
||||
argument = "-I"
|
||||
external_id = ""
|
||||
@@ -712,19 +740,31 @@ class Test_Parser:
|
||||
parsed = self.parser.parse(command)
|
||||
assert not parsed.profile
|
||||
|
||||
def test_aws_parser_external_id_short(self):
|
||||
def test_aws_parser_external_id_short(self, capsys):
|
||||
argument = "-I"
|
||||
external_id = str(uuid.uuid4())
|
||||
command = [prowler_command, argument, external_id]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.external_id == external_id
|
||||
with pytest.raises(SystemExit) as wrapped_exit:
|
||||
_ = self.parser.parse(command)
|
||||
assert wrapped_exit.type == SystemExit
|
||||
assert wrapped_exit.value.code == 2
|
||||
assert (
|
||||
capsys.readouterr().err
|
||||
== f"{prowler_default_usage_error}\nprowler: error: aws: To use -I/-T options -R option is needed\n"
|
||||
)
|
||||
|
||||
def test_aws_parser_external_id_long(self):
|
||||
def test_aws_parser_external_id_long(self, capsys):
|
||||
argument = "--external-id"
|
||||
external_id = str(uuid.uuid4())
|
||||
command = [prowler_command, argument, external_id]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.external_id == external_id
|
||||
with pytest.raises(SystemExit) as wrapped_exit:
|
||||
_ = self.parser.parse(command)
|
||||
assert wrapped_exit.type == SystemExit
|
||||
assert wrapped_exit.value.code == 2
|
||||
assert (
|
||||
capsys.readouterr().err
|
||||
== f"{prowler_default_usage_error}\nprowler: error: aws: To use -I/-T options -R option is needed\n"
|
||||
)
|
||||
|
||||
def test_aws_parser_region_f(self):
|
||||
argument = "-f"
|
||||
@@ -984,20 +1024,28 @@ class Test_Parser:
|
||||
assert parsed.subscription_ids[1] == subscription_2
|
||||
|
||||
# Test AWS flags with Azure provider
|
||||
def test_parser_azure_with_aws_flag(self):
|
||||
def test_parser_azure_with_aws_flag(self, capsys):
|
||||
command = [prowler_command, "azure", "-p"]
|
||||
with pytest.raises(SystemExit) as wrapped_exit:
|
||||
_ = self.parser.parse(command)
|
||||
assert wrapped_exit.type == SystemExit
|
||||
assert wrapped_exit.value.code == 2
|
||||
assert (
|
||||
capsys.readouterr().err
|
||||
== f"{prowler_default_usage_error}\nprowler: error: unrecognized arguments: -p\n"
|
||||
)
|
||||
|
||||
# Test Azure flags with AWS provider
|
||||
def test_parser_aws_with_azure_flag(self):
|
||||
def test_parser_aws_with_azure_flag(self, capsys):
|
||||
command = [prowler_command, "aws", "--subscription-ids"]
|
||||
with pytest.raises(SystemExit) as wrapped_exit:
|
||||
_ = self.parser.parse(command)
|
||||
assert wrapped_exit.type == SystemExit
|
||||
assert wrapped_exit.value.code == 2
|
||||
assert (
|
||||
capsys.readouterr().err
|
||||
== f"{prowler_default_usage_error}\nprowler: error: unrecognized arguments: --subscription-ids\n"
|
||||
)
|
||||
|
||||
def test_parser_gcp_auth_credentials_file(self):
|
||||
argument = "--credentials-file"
|
||||
|
||||
Reference in New Issue
Block a user