From 02519a4429a00156eb8c4dc3e596d63bd1574a53 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Mon, 17 Jul 2023 10:09:48 +0200 Subject: [PATCH] fix(assume_role): Set the AWS STS endpoint region (#2587) --- docs/tutorials/aws/authentication.md | 4 + docs/tutorials/aws/role-assumption.md | 6 +- poetry.lock | 3 +- prowler/lib/cli/parser.py | 15 +- prowler/providers/aws/aws_provider.py | 15 +- prowler/providers/aws/config.py | 1 + prowler/providers/aws/lib/arn/arn.py | 8 ++ .../aws/lib/credentials/credentials.py | 15 +- prowler/providers/common/audit_info.py | 13 +- tests/providers/aws/aws_provider_test.py | 87 ++++++++++- .../aws/lib/credentials/credentials_test.py | 136 ++++++++++++++++++ 11 files changed, 281 insertions(+), 22 deletions(-) create mode 100644 prowler/providers/aws/config.py diff --git a/docs/tutorials/aws/authentication.md b/docs/tutorials/aws/authentication.md index 971d1aa4..c8ca05d8 100644 --- a/docs/tutorials/aws/authentication.md +++ b/docs/tutorials/aws/authentication.md @@ -29,3 +29,7 @@ If your IAM entity enforces MFA you can use `--mfa` and Prowler will ask you to - ARN of your MFA device - TOTP (Time-Based One-Time Password) + +## STS Endpoint Region + +If you are using Prowler in AWS regions that are not enabled by default you need to use the argument `--sts-endpoint-region` to point the AWS STS API calls `assume-role` and `get-caller-identity` to the non-default region, e.g.: `prowler aws --sts-endpoint-region eu-south-2`. diff --git a/docs/tutorials/aws/role-assumption.md b/docs/tutorials/aws/role-assumption.md index ae302bb5..834b2ee3 100644 --- a/docs/tutorials/aws/role-assumption.md +++ b/docs/tutorials/aws/role-assumption.md @@ -23,13 +23,17 @@ prowler aws -R arn:aws:iam:::role/ prowler aws -T/--session-duration -I/--external-id -R arn:aws:iam:::role/ ``` +## STS Endpoint Region + +If you are using Prowler in AWS regions that are not enabled by default you need to use the argument `--sts-endpoint-region` to point the AWS STS API calls `assume-role` and `get-caller-identity` to the non-default region, e.g.: `prowler aws --sts-endpoint-region eu-south-2`. + ## Role MFA If your IAM Role has MFA configured you can use `--mfa` along with `-R`/`--role ` and Prowler will ask you to input the following values to get a new temporary session for the IAM Role provided: + - ARN of your MFA device - TOTP (Time-Based One-Time Password) - ## Create Role To create a role to be assumed in one or multiple accounts you can use either as CloudFormation Stack or StackSet the following [template](https://github.com/prowler-cloud/prowler/blob/master/permissions/create_role_to_assume_cfn.yaml) and adapt it. diff --git a/poetry.lock b/poetry.lock index d40eb8f2..c0814ef7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2378,7 +2378,8 @@ files = [ {file = "ruamel.yaml.clib-0.2.7-cp310-cp310-win32.whl", hash = "sha256:763d65baa3b952479c4e972669f679fe490eee058d5aa85da483ebae2009d231"}, {file = "ruamel.yaml.clib-0.2.7-cp310-cp310-win_amd64.whl", hash = "sha256:d000f258cf42fec2b1bbf2863c61d7b8918d31ffee905da62dede869254d3b8a"}, {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:045e0626baf1c52e5527bd5db361bc83180faaba2ff586e763d3d5982a876a9e"}, - {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_12_6_arm64.whl", hash = "sha256:721bc4ba4525f53f6a611ec0967bdcee61b31df5a56801281027a3a6d1c2daf5"}, + {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:1a6391a7cabb7641c32517539ca42cf84b87b667bad38b78d4d42dd23e957c81"}, + {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-manylinux2014_aarch64.whl", hash = "sha256:9c7617df90c1365638916b98cdd9be833d31d337dbcd722485597b43c4a215bf"}, {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:41d0f1fa4c6830176eef5b276af04c89320ea616655d01327d5ce65e50575c94"}, {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-win32.whl", hash = "sha256:f6d3d39611ac2e4f62c3128a9eed45f19a6608670c5a2f4f07f24e8de3441d38"}, {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-win_amd64.whl", hash = "sha256:da538167284de58a52109a9b89b8f6a53ff8437dd6dc26d33b57bf6699153122"}, diff --git a/prowler/lib/cli/parser.py b/prowler/lib/cli/parser.py index 7c311991..da08bd38 100644 --- a/prowler/lib/cli/parser.py +++ b/prowler/lib/cli/parser.py @@ -8,14 +8,7 @@ from prowler.config.config import ( default_output_directory, ) from prowler.providers.aws.aws_provider import get_aws_available_regions -from prowler.providers.aws.lib.arn.arn import is_valid_arn - - -def arn_type(arn: str) -> bool: - """arn_type returns a string ARN if it is valid and raises an argparse.ArgumentError if not.""" - if not is_valid_arn(arn): - raise argparse.ArgumentError("Invalid ARN") - return arn +from prowler.providers.aws.lib.arn.arn import arn_type class ProwlerArgumentParser: @@ -289,6 +282,12 @@ Detailed documentation at https://docs.prowler.cloud help="ARN of the role to be assumed", # Pending ARN validation ) + aws_auth_subparser.add_argument( + "--sts-endpoint-region", + nargs="?", + default=None, + help="Specify the AWS STS endpoint region to use. Read more at https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_enable-regions.html", + ) aws_auth_subparser.add_argument( "--mfa", action="store_true", diff --git a/prowler/providers/aws/aws_provider.py b/prowler/providers/aws/aws_provider.py index 616a30ec..9bb5168e 100644 --- a/prowler/providers/aws/aws_provider.py +++ b/prowler/providers/aws/aws_provider.py @@ -10,6 +10,7 @@ from prowler.config.config import aws_services_json_file from prowler.lib.check.check import list_modules, recover_checks_from_service from prowler.lib.logger import logger from prowler.lib.utils.utils import open_file, parse_json_file +from prowler.providers.aws.config import AWS_STS_GLOBAL_ENDPOINT_REGION from prowler.providers.aws.lib.audit_info.models import AWS_Assume_Role, AWS_Audit_Info @@ -105,7 +106,11 @@ class AWS_Provider: return refreshed_credentials -def assume_role(session: session.Session, assumed_role_info: AWS_Assume_Role) -> dict: +def assume_role( + session: session.Session, + assumed_role_info: AWS_Assume_Role, + sts_endpoint_region: str = None, +) -> dict: try: assume_role_arguments = { "RoleArn": assumed_role_info.role_arn, @@ -113,6 +118,7 @@ def assume_role(session: session.Session, assumed_role_info: AWS_Assume_Role) -> "DurationSeconds": assumed_role_info.session_duration, } + # Set the info to assume the role from the partition, account and role name if assumed_role_info.external_id: assume_role_arguments["ExternalId"] = assumed_role_info.external_id @@ -121,8 +127,11 @@ def assume_role(session: session.Session, assumed_role_info: AWS_Assume_Role) -> assume_role_arguments["SerialNumber"] = mfa_ARN assume_role_arguments["TokenCode"] = mfa_TOTP - # set the info to assume the role from the partition, account and role name - sts_client = session.client("sts") + # Set the STS Endpoint Region + if sts_endpoint_region is None: + sts_endpoint_region = AWS_STS_GLOBAL_ENDPOINT_REGION + + sts_client = session.client("sts", sts_endpoint_region) assumed_credentials = sts_client.assume_role(**assume_role_arguments) except Exception as error: logger.critical( diff --git a/prowler/providers/aws/config.py b/prowler/providers/aws/config.py new file mode 100644 index 00000000..fed5d2dc --- /dev/null +++ b/prowler/providers/aws/config.py @@ -0,0 +1 @@ +AWS_STS_GLOBAL_ENDPOINT_REGION = "us-east-1" diff --git a/prowler/providers/aws/lib/arn/arn.py b/prowler/providers/aws/lib/arn/arn.py index 653715f0..b2bdfa26 100644 --- a/prowler/providers/aws/lib/arn/arn.py +++ b/prowler/providers/aws/lib/arn/arn.py @@ -1,4 +1,5 @@ import re +from argparse import ArgumentError from prowler.providers.aws.lib.arn.error import ( RoleArnParsingEmptyResource, @@ -11,6 +12,13 @@ from prowler.providers.aws.lib.arn.error import ( from prowler.providers.aws.lib.arn.models import ARN +def arn_type(arn: str) -> bool: + """arn_type returns a string ARN if it is valid and raises an argparse.ArgumentError if not.""" + if not is_valid_arn(arn): + raise ArgumentError("Invalid ARN") + return arn + + def parse_iam_credentials_arn(arn: str) -> ARN: arn_parsed = ARN(arn) # First check if region is empty (in IAM ARN's region is always empty) diff --git a/prowler/providers/aws/lib/credentials/credentials.py b/prowler/providers/aws/lib/credentials/credentials.py index e9539122..66eb915e 100644 --- a/prowler/providers/aws/lib/credentials/credentials.py +++ b/prowler/providers/aws/lib/credentials/credentials.py @@ -4,15 +4,21 @@ from boto3 import session from colorama import Fore, Style from prowler.lib.logger import logger +from prowler.providers.aws.config import AWS_STS_GLOBAL_ENDPOINT_REGION from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info -AWS_STS_GLOBAL_ENDPOINT_REGION = "us-east-1" - -def validate_aws_credentials(session: session, input_regions: list) -> dict: +def validate_aws_credentials( + session: session, input_regions: list, sts_endpoint_region: str = None +) -> dict: try: # For a valid STS GetCallerIdentity we have to use the right AWS Region - if input_regions is None or len(input_regions) == 0: + # Check if the --sts-endpoint-region is set + if sts_endpoint_region is not None: + aws_region = sts_endpoint_region + # If there is no region passed with -f/--region/--filter-region + elif input_regions is None or len(input_regions) == 0: + # If you have a region configured in your AWS config or credentials file if session.region_name is not None: aws_region = session.region_name else: @@ -22,6 +28,7 @@ def validate_aws_credentials(session: session, input_regions: list) -> dict: else: # Get the first region passed to the -f/--region aws_region = input_regions[0] + validate_credentials_client = session.client("sts", aws_region) caller_identity = validate_credentials_client.get_caller_identity() # Include the region where the caller_identity has validated the credentials diff --git a/prowler/providers/common/audit_info.py b/prowler/providers/common/audit_info.py index 6f275240..8aa24e6a 100644 --- a/prowler/providers/common/audit_info.py +++ b/prowler/providers/common/audit_info.py @@ -81,6 +81,9 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE input_session_duration = arguments.get("session_duration") input_external_id = arguments.get("external_id") + # STS Endpoint Region + sts_endpoint_region = arguments.get("sts_endpoint_region") + # Since the range(i,j) goes from i to j-1 we have to j+1 if input_session_duration and input_session_duration not in range(900, 43201): raise Exception("Value for -T option must be between 900 and 43200") @@ -128,7 +131,7 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE logger.info("Validating credentials ...") # Verificate if we have valid credentials caller_identity = validate_aws_credentials( - current_audit_info.original_session, input_regions + current_audit_info.original_session, input_regions, sts_endpoint_region ) logger.info("Credentials validated") @@ -168,7 +171,9 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE f"Getting organizations metadata for account {organizations_role_arn}" ) assumed_credentials = assume_role( - aws_provider.aws_session, aws_provider.role_info + aws_provider.aws_session, + aws_provider.role_info, + sts_endpoint_region, ) current_audit_info.organizations_metadata = get_organizations_metadata( current_audit_info.audited_account, assumed_credentials @@ -201,7 +206,9 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE ) # Assume the role assumed_role_response = assume_role( - aws_provider.aws_session, aws_provider.role_info + aws_provider.aws_session, + aws_provider.role_info, + sts_endpoint_region, ) logger.info("Role assumed") # Set the info needed to create a session with an assumed role diff --git a/tests/providers/aws/aws_provider_test.py b/tests/providers/aws/aws_provider_test.py index 65c4804a..0f7c9378 100644 --- a/tests/providers/aws/aws_provider_test.py +++ b/tests/providers/aws/aws_provider_test.py @@ -24,7 +24,7 @@ class Test_AWS_Provider: role_name = "test-role" role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}" session_duration_seconds = 900 - audited_regions = "eu-west-1" + audited_regions = ["eu-west-1"] sessionName = "ProwlerAsessmentSession" # Boto 3 client to create our user iam_client = boto3.client("iam", region_name="us-east-1") @@ -105,7 +105,7 @@ class Test_AWS_Provider: role_name = "test-role" role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}" session_duration_seconds = 900 - audited_regions = "eu-west-1" + audited_regions = ["eu-west-1"] sessionName = "ProwlerAsessmentSession" # Boto 3 client to create our user iam_client = boto3.client("iam", region_name="us-east-1") @@ -184,6 +184,89 @@ class Test_AWS_Provider: "AssumedRoleId" ].should.have.length_of(21 + 1 + len(sessionName)) + @mock_iam + @mock_sts + def test_assume_role_with_sts_endpoint_region(self): + # Variables + role_name = "test-role" + role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}" + session_duration_seconds = 900 + aws_region = "eu-west-1" + sts_endpoint_region = aws_region + audited_regions = [aws_region] + sessionName = "ProwlerAsessmentSession" + # Boto 3 client to create our user + iam_client = boto3.client("iam", region_name=aws_region) + # IAM user + iam_user = iam_client.create_user(UserName="test-user")["User"] + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + # New Boto3 session with the previously create user + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=aws_region, + ) + + # Fulfil the input session object for Prowler + audit_info = AWS_Audit_Info( + session_config=None, + original_session=session, + audit_session=None, + audited_account=None, + audited_account_arn=None, + audited_partition=None, + audited_identity_arn=None, + audited_user_id=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=AWS_Assume_Role( + role_arn=role_arn, + session_duration=session_duration_seconds, + external_id=None, + mfa_enabled=False, + ), + audited_regions=audited_regions, + organizations_metadata=None, + audit_resources=None, + mfa_enabled=False, + ) + + # Call assume_role + aws_provider = AWS_Provider(audit_info) + assume_role_response = assume_role( + aws_provider.aws_session, aws_provider.role_info, sts_endpoint_region + ) + # Recover credentials for the assume role operation + credentials = assume_role_response["Credentials"] + # Test the response + # SessionToken + credentials["SessionToken"].should.have.length_of(356) + credentials["SessionToken"].startswith("FQoGZXIvYXdzE") + # AccessKeyId + credentials["AccessKeyId"].should.have.length_of(20) + credentials["AccessKeyId"].startswith("ASIA") + # SecretAccessKey + credentials["SecretAccessKey"].should.have.length_of(40) + # Assumed Role + assume_role_response["AssumedRoleUser"]["Arn"].should.equal( + f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{sessionName}" + ) + # AssumedRoleUser + assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].startswith( + "AROA" + ) + assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].endswith( + ":" + sessionName + ) + assume_role_response["AssumedRoleUser"]["AssumedRoleId"].should.have.length_of( + 21 + 1 + len(sessionName) + ) + def test_generate_regional_clients(self): # New Boto3 session with the previously create user session = boto3.session.Session( diff --git a/tests/providers/aws/lib/credentials/credentials_test.py b/tests/providers/aws/lib/credentials/credentials_test.py index 61abeaa4..f5481df7 100644 --- a/tests/providers/aws/lib/credentials/credentials_test.py +++ b/tests/providers/aws/lib/credentials/credentials_test.py @@ -188,6 +188,47 @@ class Test_AWS_Credentials: assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + @mock_sts + @mock_iam + def test_validate_credentials_commercial_partition_with_regions_none_and_profile_region_but_sts_endpoint_region( + self, + ): + # AWS Region for AWS COMMERCIAL + aws_region = "eu-west-1" + sts_endpoint_region = aws_region + aws_partition = "aws" + # Create a mock IAM user + iam_client = boto3.client("iam", region_name=aws_region) + iam_user = iam_client.create_user(UserName="test-user")["User"] + # Create a mock IAM access keys + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + + # Create AWS session to validate + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=aws_region, + ) + + get_caller_identity = validate_aws_credentials( + session, None, sts_endpoint_region + ) + + assert get_caller_identity["region"] == aws_region + + caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"]) + + assert caller_identity_arn.partition == aws_partition + assert caller_identity_arn.region is None + assert caller_identity_arn.resource == "test-user" + assert caller_identity_arn.resource_type == "user" + assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) + assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + @mock_sts @mock_iam def test_validate_credentials_china_partition_without_regions_and_profile_region_so_us_east_1( @@ -268,6 +309,53 @@ class Test_AWS_Credentials: assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + @mock_sts + @mock_iam + @patch( + "botocore.client.BaseClient._make_api_call", new=mock_get_caller_identity_china + ) + def test_validate_credentials_china_partition_without_regions_but_sts_endpoint_region( + self, + ): + # AWS Region for AWS CHINA + aws_region = "cn-north-1" + sts_endpoint_region = aws_region + aws_partition = "aws-cn" + # Create a mock IAM user + iam_client = boto3.client("iam", region_name=aws_region) + iam_user = iam_client.create_user(UserName="test-user")["User"] + # Create a mock IAM access keys + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + + # Create AWS session to validate + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=aws_region, + ) + + get_caller_identity = validate_aws_credentials( + session, None, sts_endpoint_region + ) + + # To use GovCloud or China it is either required: + # - Set the AWS profile region with a valid partition region + # - Use the -f/--region with a valid partition region + assert get_caller_identity["region"] == aws_region + + caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"]) + + assert caller_identity_arn.partition == aws_partition + assert caller_identity_arn.region is None + assert caller_identity_arn.resource == "test-user" + assert caller_identity_arn.resource_type == "user" + assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) + assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + @mock_sts @mock_iam @patch( @@ -310,3 +398,51 @@ class Test_AWS_Credentials: assert caller_identity_arn.resource_type == "user" assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + + @mock_sts + @mock_iam + @patch( + "botocore.client.BaseClient._make_api_call", + new=mock_get_caller_identity_gov_cloud, + ) + def test_validate_credentials_gov_cloud_partition_without_regions_but_sts_endpoint_region( + self, + ): + # AWS Region for US GOV CLOUD + aws_region = "us-gov-east-1" + sts_endpoint_region = aws_region + aws_partition = "aws-us-gov" + # Create a mock IAM user + iam_client = boto3.client("iam", region_name=aws_region) + iam_user = iam_client.create_user(UserName="test-user")["User"] + # Create a mock IAM access keys + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + + # Create AWS session to validate + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=aws_region, + ) + + get_caller_identity = validate_aws_credentials( + session, None, sts_endpoint_region + ) + + # To use GovCloud or China it is either required: + # - Set the AWS profile region with a valid partition region + # - Use the -f/--region with a valid partition region + assert get_caller_identity["region"] == aws_region + + caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"]) + + assert caller_identity_arn.partition == aws_partition + assert caller_identity_arn.region is None + assert caller_identity_arn.resource == "test-user" + assert caller_identity_arn.resource_type == "user" + assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) + assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER