fix(): Refresh credentials when assuming role (#1636)

This commit is contained in:
Nacho Rivera
2023-01-04 08:48:00 +01:00
committed by GitHub
parent 9b8c80b74d
commit f0db63da35
3 changed files with 29 additions and 21 deletions

View File

@@ -8,7 +8,7 @@ from botocore.session import get_session
from prowler.config.config import aws_services_json_file
from prowler.lib.logger import logger
from prowler.lib.utils.utils import open_file, parse_json_file
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.lib.audit_info.models import AWS_Assume_Role, AWS_Audit_Info
################## AWS PROVIDER
@@ -43,7 +43,6 @@ class AWS_Provider:
assumed_botocore_session.set_config_variable(
"region", audit_info.profile_region
)
return session.Session(
profile_name=audit_info.profile,
botocore_session=assumed_botocore_session,
@@ -62,7 +61,7 @@ class AWS_Provider:
def refresh(self):
logger.info("Refreshing assumed credentials...")
response = assume_role(self.role_info)
response = assume_role(self.aws_session, self.role_info)
refreshed_credentials = dict(
# Keys of the dict has to be the same as those that are being searched in the parent class
# https://github.com/boto/botocore/blob/098cc255f81a25b852e1ecdeb7adebd94c7b1b73/botocore/credentials.py#L609
@@ -76,24 +75,24 @@ class AWS_Provider:
return refreshed_credentials
def assume_role(audit_info: AWS_Audit_Info) -> dict:
def assume_role(session: session.Session, assumed_role_info: AWS_Assume_Role) -> dict:
try:
# set the info to assume the role from the partition, account and role name
sts_client = audit_info.original_session.client("sts")
sts_client = session.client("sts")
# If external id, set it to the assume role api call
if audit_info.assumed_role_info.external_id:
if assumed_role_info.external_id:
assumed_credentials = sts_client.assume_role(
RoleArn=audit_info.assumed_role_info.role_arn,
RoleArn=assumed_role_info.role_arn,
RoleSessionName="ProwlerProAsessmentSession",
DurationSeconds=audit_info.assumed_role_info.session_duration,
ExternalId=audit_info.assumed_role_info.external_id,
DurationSeconds=assumed_role_info.session_duration,
ExternalId=assumed_role_info.external_id,
)
# else assume the role without the external id
else:
assumed_credentials = sts_client.assume_role(
RoleArn=audit_info.assumed_role_info.role_arn,
RoleArn=assumed_role_info.role_arn,
RoleSessionName="ProwlerProAsessmentSession",
DurationSeconds=audit_info.assumed_role_info.session_duration,
DurationSeconds=assumed_role_info.session_duration,
)
except Exception as error:
logger.critical(f"{error.__class__.__name__} -- {error}")

View File

@@ -125,9 +125,8 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE
logger.info("Generating original session ...")
# Create an global original session using only profile/basic credentials info
current_audit_info.original_session = AWS_Provider(
current_audit_info
).get_session()
aws_provider = AWS_Provider(current_audit_info)
current_audit_info.original_session = aws_provider.aws_session
logger.info("Validating credentials ...")
# Verificate if we have valid credentials
caller_identity = self.validate_credentials(current_audit_info.original_session)
@@ -165,7 +164,9 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE
logger.info(
f"Getting organizations metadata for account {organizations_role_arn}"
)
assumed_credentials = assume_role(current_audit_info)
assumed_credentials = assume_role(
aws_provider.aws_session, aws_provider.role_info
)
current_audit_info.organizations_metadata = (
self.get_organizations_metadata(
current_audit_info.audited_account, assumed_credentials
@@ -197,7 +198,9 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE
f"Assuming role {current_audit_info.assumed_role_info.role_arn}"
)
# Assume the role
assumed_role_response = assume_role(current_audit_info)
assumed_role_response = assume_role(
aws_provider.aws_session, aws_provider.role_info
)
logger.info("Role assumed")
# Set the info needed to create a session with an assumed role
current_audit_info.credentials = AWS_Credentials(
@@ -212,8 +215,8 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE
],
expiration=assumed_role_response["Credentials"]["Expiration"],
)
assumed_session = AWS_Provider(current_audit_info).get_session()
# new session is needed
assumed_session = aws_provider.set_session(current_audit_info)
if assumed_session:
logger.info("Audit session is the new session created assuming role")
current_audit_info.audit_session = assumed_session
@@ -222,7 +225,6 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE
else:
logger.info("Audit session is the original one")
current_audit_info.audit_session = current_audit_info.original_session
# Setting default region of session
if current_audit_info.audit_session.region_name:
current_audit_info.profile_region = (

View File

@@ -2,7 +2,11 @@ import boto3
import sure # noqa
from moto import mock_iam, mock_sts
from prowler.providers.aws.aws_provider import assume_role, generate_regional_clients
from prowler.providers.aws.aws_provider import (
AWS_Provider,
assume_role,
generate_regional_clients,
)
from prowler.providers.aws.lib.audit_info.models import AWS_Assume_Role, AWS_Audit_Info
ACCOUNT_ID = 123456789012
@@ -55,7 +59,10 @@ class Test_AWS_Provider:
)
# Call assume_role
assume_role_response = assume_role(audit_info)
aws_provider = AWS_Provider(audit_info)
assume_role_response = assume_role(
aws_provider.aws_session, aws_provider.role_info
)
# Recover credentials for the assume role operation
credentials = assume_role_response["Credentials"]
# Test the response