From f0db63da351bb96821fed07355a613b8befd8342 Mon Sep 17 00:00:00 2001 From: Nacho Rivera <59198746+n4ch04@users.noreply.github.com> Date: Wed, 4 Jan 2023 08:48:00 +0100 Subject: [PATCH] fix(): Refresh credentials when assuming role (#1636) --- prowler/providers/aws/aws_provider.py | 21 ++++++++++----------- prowler/providers/common/audit_info.py | 18 ++++++++++-------- tests/providers/aws/aws_provider_test.py | 11 +++++++++-- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/prowler/providers/aws/aws_provider.py b/prowler/providers/aws/aws_provider.py index 34cea459..9bab46dc 100644 --- a/prowler/providers/aws/aws_provider.py +++ b/prowler/providers/aws/aws_provider.py @@ -8,7 +8,7 @@ from botocore.session import get_session from prowler.config.config import aws_services_json_file from prowler.lib.logger import logger from prowler.lib.utils.utils import open_file, parse_json_file -from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info +from prowler.providers.aws.lib.audit_info.models import AWS_Assume_Role, AWS_Audit_Info ################## AWS PROVIDER @@ -43,7 +43,6 @@ class AWS_Provider: assumed_botocore_session.set_config_variable( "region", audit_info.profile_region ) - return session.Session( profile_name=audit_info.profile, botocore_session=assumed_botocore_session, @@ -62,7 +61,7 @@ class AWS_Provider: def refresh(self): logger.info("Refreshing assumed credentials...") - response = assume_role(self.role_info) + response = assume_role(self.aws_session, self.role_info) refreshed_credentials = dict( # Keys of the dict has to be the same as those that are being searched in the parent class # https://github.com/boto/botocore/blob/098cc255f81a25b852e1ecdeb7adebd94c7b1b73/botocore/credentials.py#L609 @@ -76,24 +75,24 @@ class AWS_Provider: return refreshed_credentials -def assume_role(audit_info: AWS_Audit_Info) -> dict: +def assume_role(session: session.Session, assumed_role_info: AWS_Assume_Role) -> dict: try: # set the info to assume the role from the partition, account and role name - sts_client = audit_info.original_session.client("sts") + sts_client = session.client("sts") # If external id, set it to the assume role api call - if audit_info.assumed_role_info.external_id: + if assumed_role_info.external_id: assumed_credentials = sts_client.assume_role( - RoleArn=audit_info.assumed_role_info.role_arn, + RoleArn=assumed_role_info.role_arn, RoleSessionName="ProwlerProAsessmentSession", - DurationSeconds=audit_info.assumed_role_info.session_duration, - ExternalId=audit_info.assumed_role_info.external_id, + DurationSeconds=assumed_role_info.session_duration, + ExternalId=assumed_role_info.external_id, ) # else assume the role without the external id else: assumed_credentials = sts_client.assume_role( - RoleArn=audit_info.assumed_role_info.role_arn, + RoleArn=assumed_role_info.role_arn, RoleSessionName="ProwlerProAsessmentSession", - DurationSeconds=audit_info.assumed_role_info.session_duration, + DurationSeconds=assumed_role_info.session_duration, ) except Exception as error: logger.critical(f"{error.__class__.__name__} -- {error}") diff --git a/prowler/providers/common/audit_info.py b/prowler/providers/common/audit_info.py index 5a6bcb9c..09feb0f6 100644 --- a/prowler/providers/common/audit_info.py +++ b/prowler/providers/common/audit_info.py @@ -125,9 +125,8 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE logger.info("Generating original session ...") # Create an global original session using only profile/basic credentials info - current_audit_info.original_session = AWS_Provider( - current_audit_info - ).get_session() + aws_provider = AWS_Provider(current_audit_info) + current_audit_info.original_session = aws_provider.aws_session logger.info("Validating credentials ...") # Verificate if we have valid credentials caller_identity = self.validate_credentials(current_audit_info.original_session) @@ -165,7 +164,9 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE logger.info( f"Getting organizations metadata for account {organizations_role_arn}" ) - assumed_credentials = assume_role(current_audit_info) + assumed_credentials = assume_role( + aws_provider.aws_session, aws_provider.role_info + ) current_audit_info.organizations_metadata = ( self.get_organizations_metadata( current_audit_info.audited_account, assumed_credentials @@ -197,7 +198,9 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE f"Assuming role {current_audit_info.assumed_role_info.role_arn}" ) # Assume the role - assumed_role_response = assume_role(current_audit_info) + assumed_role_response = assume_role( + aws_provider.aws_session, aws_provider.role_info + ) logger.info("Role assumed") # Set the info needed to create a session with an assumed role current_audit_info.credentials = AWS_Credentials( @@ -212,8 +215,8 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE ], expiration=assumed_role_response["Credentials"]["Expiration"], ) - assumed_session = AWS_Provider(current_audit_info).get_session() - + # new session is needed + assumed_session = aws_provider.set_session(current_audit_info) if assumed_session: logger.info("Audit session is the new session created assuming role") current_audit_info.audit_session = assumed_session @@ -222,7 +225,6 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE else: logger.info("Audit session is the original one") current_audit_info.audit_session = current_audit_info.original_session - # Setting default region of session if current_audit_info.audit_session.region_name: current_audit_info.profile_region = ( diff --git a/tests/providers/aws/aws_provider_test.py b/tests/providers/aws/aws_provider_test.py index 5a850b0d..0efd904a 100644 --- a/tests/providers/aws/aws_provider_test.py +++ b/tests/providers/aws/aws_provider_test.py @@ -2,7 +2,11 @@ import boto3 import sure # noqa from moto import mock_iam, mock_sts -from prowler.providers.aws.aws_provider import assume_role, generate_regional_clients +from prowler.providers.aws.aws_provider import ( + AWS_Provider, + assume_role, + generate_regional_clients, +) from prowler.providers.aws.lib.audit_info.models import AWS_Assume_Role, AWS_Audit_Info ACCOUNT_ID = 123456789012 @@ -55,7 +59,10 @@ class Test_AWS_Provider: ) # Call assume_role - assume_role_response = assume_role(audit_info) + aws_provider = AWS_Provider(audit_info) + assume_role_response = assume_role( + aws_provider.aws_session, aws_provider.role_info + ) # Recover credentials for the assume role operation credentials = assume_role_response["Credentials"] # Test the response