From e84f5f184eb6afb5baff29d6891dc8fe5173cae7 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Thu, 18 May 2023 15:51:57 +0200 Subject: [PATCH] fix(sts): Use the right region to validate credentials (#2349) Co-authored-by: Sergio Garcia --- docs/tutorials/aws/regions-and-partitions.md | 79 +++++ mkdocs.yml | 1 + poetry.lock | 14 +- prowler/providers/aws/aws_provider.py | 4 +- prowler/providers/aws/lib/arn/arn.py | 66 ++-- prowler/providers/aws/lib/arn/error.py | 28 +- prowler/providers/aws/lib/arn/models.py | 57 ++++ .../providers/aws/lib/credentials/__init__.py | 0 .../aws/lib/credentials/credentials.py | 59 ++++ .../aws/lib/organizations/__init__.py | 0 .../aws/lib/organizations/organizations.py | 40 +++ .../lib/quick_inventory/quick_inventory.py | 19 +- prowler/providers/common/audit_info.py | 107 ++---- pyproject.toml | 1 - tests/providers/aws/lib/arn/arn_test.py | 296 ++++++++++++++++- .../aws/lib/credentials/credentials_test.py | 312 ++++++++++++++++++ .../lib/organizations/organizations_test.py | 61 ++++ tests/providers/common/audit_info_test.py | 290 +++++++--------- 18 files changed, 1089 insertions(+), 345 deletions(-) create mode 100644 docs/tutorials/aws/regions-and-partitions.md create mode 100644 prowler/providers/aws/lib/arn/models.py create mode 100644 prowler/providers/aws/lib/credentials/__init__.py create mode 100644 prowler/providers/aws/lib/credentials/credentials.py create mode 100644 prowler/providers/aws/lib/organizations/__init__.py create mode 100644 prowler/providers/aws/lib/organizations/organizations.py create mode 100644 tests/providers/aws/lib/credentials/credentials_test.py create mode 100644 tests/providers/aws/lib/organizations/organizations_test.py diff --git a/docs/tutorials/aws/regions-and-partitions.md b/docs/tutorials/aws/regions-and-partitions.md new file mode 100644 index 00000000..fc41d893 --- /dev/null +++ b/docs/tutorials/aws/regions-and-partitions.md @@ -0,0 +1,79 @@ +# AWS Regions and Partitions + +By default Prowler is able to scan the following AWS partitions: +- Commercial: `aws` +- China: `aws-cn` +- GovCloud (US): `aws-us-gov` + +> To check the available regions for each partition and service please refer to the following document [aws_regions_by_service.json](https://github.com/prowler-cloud/prowler/blob/master/prowler/providers/aws/aws_regions_by_service.json) + +It is important to take into consideration that to scan the China (`aws-cn`) or GovCloud (`aws-us-gov`) partitions it is either required to have a valid region for that partition in your AWS credentials (Refer to https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials for more information) or to specify the regions you want to audit for that partition using the `-f/--region` flag. + +You can get more information about the available partitions and regions in the following [Botocore](https://github.com/boto/botocore) file https://github.com/boto/botocore/blob/22a19ea7c4c2c4dd7df4ab8c32733cba0c7597a4/botocore/data/partitions.json +## AWS China + +To scan your AWS Account in the China partition (`aws-cn`): + +- Using the `-f/--region` flag: +``` +prowler aws --region cn-north-1 cn-northwest-1 +``` +- Using the region configured in your AWS profile at `~/.aws/credentials` or `~/.aws/config`: +``` +[default] +aws_access_key_id = XXXXXXXXXXXXXXXXXXX +aws_secret_access_key = XXXXXXXXXXXXXXXXXXX +region = cn-north-1 +``` +> With this option all the partition regions will be scanned without the need of use the `-f/--region` flag + + +## AWS GovCloud (US) + +To scan your AWS Account in the GovCloud (US) partition (`aws-us-gov`): + +- Using the `-f/--region` flag: +``` +prowler aws --region us-gov-east-1 us-gov-west-1 +``` +- Using the region configured in your AWS profile at `~/.aws/credentials` or `~/.aws/config`: +``` +[default] +aws_access_key_id = XXXXXXXXXXXXXXXXXXX +aws_secret_access_key = XXXXXXXXXXXXXXXXXXX +region = us-gov-east-1 +``` +> With this option all the partition regions will be scanned without the need of use the `-f/--region` flag + + +## AWS ISO (US & Europe) + +For the AWS ISO partitions, which are known as "secret partitions" and are air-gapped from the internet there is no builtin way to scanned it. In this scenario if you want to audit an AWS Account in one of the AWS ISO partitions you should manually update the [aws_regions_by_service.json](https://github.com/prowler-cloud/prowler/blob/master/prowler/providers/aws/aws_regions_by_service.json) and include the partition, region and services, e.g.: +```json +"iam": { + "regions": { + "aws": [ + "eu-west-1", + "us-east-1", + ], + "aws-cn": [ + "cn-north-1", + "cn-northwest-1" + ], + "aws-us-gov": [ + "us-gov-east-1", + "us-gov-west-1" + ], + "aws-iso": [ + "aws-iso-global", + "us-iso-east-1", + "us-iso-west-1" + ], + "aws-iso-b": [ + "aws-iso-b-global", + "us-isob-east-1" + ], + "aws-iso-e": [], + } +}, +``` diff --git a/mkdocs.yml b/mkdocs.yml index 8c6b393c..1ba9ea8e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -43,6 +43,7 @@ nav: - Assume Role: tutorials/aws/role-assumption.md - AWS Security Hub: tutorials/aws/securityhub.md - AWS Organizations: tutorials/aws/organizations.md + - AWS Regions and Partitions: tutorials/aws/regions-and-partitions.md - Scan Multiple AWS Accounts: tutorials/aws/multiaccount.md - AWS CloudShell: tutorials/aws/cloudshell.md - Checks v2 to v3 Mapping: tutorials/aws/v2_to_v3_checks_mapping.md diff --git a/poetry.lock b/poetry.lock index 750804fe..12378343 100644 --- a/poetry.lock +++ b/poetry.lock @@ -28,18 +28,6 @@ files = [ about-time = "4.2.1" grapheme = "0.6.0" -[[package]] -name = "arnparse" -version = "0.0.2" -description = "Parse ARNs using Python" -category = "main" -optional = false -python-versions = "*" -files = [ - {file = "arnparse-0.0.2-py2.py3-none-any.whl", hash = "sha256:b0906734e4b8f19e39b1e32944c6cd6274b6da90c066a83882ac7a11d27553e0"}, - {file = "arnparse-0.0.2.tar.gz", hash = "sha256:cb87f17200d07121108a9085d4a09cc69a55582647776b9a917b0b1f279db8f8"}, -] - [[package]] name = "astroid" version = "2.15.4" @@ -2887,4 +2875,4 @@ docs = ["mkdocs", "mkdocs-material"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "d79eb4bb147bb8298a228002aa332ff8c8e380702eb81bf8012586c899890bfc" +content-hash = "bf40bfed6a88bde379337be45f5be5dd468ea3c7c79ff4ff92448ead3fe79308" diff --git a/prowler/providers/aws/aws_provider.py b/prowler/providers/aws/aws_provider.py index c328a794..c373669a 100644 --- a/prowler/providers/aws/aws_provider.py +++ b/prowler/providers/aws/aws_provider.py @@ -36,7 +36,7 @@ class AWS_Provider: secret_key=audit_info.credentials.aws_secret_access_key, token=audit_info.credentials.aws_session_token, expiry_time=audit_info.credentials.expiration, - refresh_using=self.refresh, + refresh_using=self.refresh_credentials, method="sts-assume-role", ) # Here we need the botocore session since it needs to use refreshable credentials @@ -60,7 +60,7 @@ class AWS_Provider: # Refresh credentials method using assume role # This method is called "adding ()" to the name, so it cannot accept arguments # https://github.com/boto/botocore/blob/098cc255f81a25b852e1ecdeb7adebd94c7b1b73/botocore/credentials.py#L570 - def refresh(self): + def refresh_credentials(self): logger.info("Refreshing assumed credentials...") response = assume_role(self.aws_session, self.role_info) diff --git a/prowler/providers/aws/lib/arn/arn.py b/prowler/providers/aws/lib/arn/arn.py index 930a12c5..b2bdcabf 100644 --- a/prowler/providers/aws/lib/arn/arn.py +++ b/prowler/providers/aws/lib/arn/arn.py @@ -1,50 +1,48 @@ import re -from arnparse import arnparse - from prowler.providers.aws.lib.arn.error import ( RoleArnParsingEmptyResource, - RoleArnParsingFailedMissingFields, RoleArnParsingIAMRegionNotEmpty, RoleArnParsingInvalidAccountID, RoleArnParsingInvalidResourceType, RoleArnParsingPartitionEmpty, - RoleArnParsingServiceNotIAM, + RoleArnParsingServiceNotIAMnorSTS, ) +from prowler.providers.aws.lib.arn.models import ARN -def arn_parsing(arn): - # check for number of fields, must be six - if len(arn.split(":")) != 6: - raise RoleArnParsingFailedMissingFields +def parse_iam_credentials_arn(arn: str) -> ARN: + arn_parsed = ARN(arn) + # First check if region is empty (in IAM ARN's region is always empty) + if arn_parsed.region: + raise RoleArnParsingIAMRegionNotEmpty else: - arn_parsed = arnparse(arn) - # First check if region is empty (in IAM arns region is always empty) - if arn_parsed.region is not None: - raise RoleArnParsingIAMRegionNotEmpty + # check if needed fields are filled: + # - partition + # - service + # - account_id + # - resource_type + # - resource + if arn_parsed.partition is None or arn_parsed.partition == "": + raise RoleArnParsingPartitionEmpty + elif arn_parsed.service != "iam" and arn_parsed.service != "sts": + raise RoleArnParsingServiceNotIAMnorSTS + elif ( + arn_parsed.account_id is None + or len(arn_parsed.account_id) != 12 + or not arn_parsed.account_id.isnumeric() + ): + raise RoleArnParsingInvalidAccountID + elif ( + arn_parsed.resource_type != "role" + and arn_parsed.resource_type != "user" + and arn_parsed.resource_type != "assumed-role" + ): + raise RoleArnParsingInvalidResourceType + elif arn_parsed.resource == "": + raise RoleArnParsingEmptyResource else: - # check if needed fields are filled: - # - partition - # - service - # - account_id - # - resource_type - # - resource - if arn_parsed.partition is None: - raise RoleArnParsingPartitionEmpty - elif arn_parsed.service != "iam": - raise RoleArnParsingServiceNotIAM - elif ( - arn_parsed.account_id is None - or len(arn_parsed.account_id) != 12 - or not arn_parsed.account_id.isnumeric() - ): - raise RoleArnParsingInvalidAccountID - elif arn_parsed.resource_type != "role": - raise RoleArnParsingInvalidResourceType - elif arn_parsed.resource == "": - raise RoleArnParsingEmptyResource - else: - return arn_parsed + return arn_parsed def is_valid_arn(arn: str) -> bool: diff --git a/prowler/providers/aws/lib/arn/error.py b/prowler/providers/aws/lib/arn/error.py index 2084953a..13e78421 100644 --- a/prowler/providers/aws/lib/arn/error.py +++ b/prowler/providers/aws/lib/arn/error.py @@ -1,43 +1,49 @@ class RoleArnParsingFailedMissingFields(Exception): - # The arn contains a numberof fields different than six separated by :" + # The ARN contains a numberof fields different than six separated by :" def __init__(self): - self.message = "The assumed role arn contains a number of fields different than six separated by :, please input a valid arn" + self.message = "The assumed role ARN contains an invalid number of fields separated by : or it does not start by arn, please input a valid ARN" super().__init__(self.message) class RoleArnParsingIAMRegionNotEmpty(Exception): - # The arn contains a non-empty value for region, since it is an IAM arn is not valid + # The ARN contains a non-empty value for region, since it is an IAM ARN is not valid def __init__(self): - self.message = "The assumed role arn contains a non-empty value for region, since it is an IAM arn is not valid, please input a valid arn" + self.message = "The assumed role ARN contains a non-empty value for region, since it is an IAM ARN is not valid, please input a valid ARN" super().__init__(self.message) class RoleArnParsingPartitionEmpty(Exception): - # The arn contains an empty value for partition + # The ARN contains an empty value for partition def __init__(self): - self.message = "The assumed role arn does not contain a value for partition, please input a valid arn" + self.message = "The assumed role ARN does not contain a value for partition, please input a valid ARN" super().__init__(self.message) -class RoleArnParsingServiceNotIAM(Exception): +class RoleArnParsingServiceNotIAMnorSTS(Exception): def __init__(self): - self.message = "The assumed role arn contains a value for service distinct than iam, please input a valid arn" + self.message = "The assumed role ARN contains a value for service distinct than IAM or STS, please input a valid ARN" + super().__init__(self.message) + + +class RoleArnParsingServiceNotSTS(Exception): + def __init__(self): + self.message = "The assumed role ARN contains a value for service distinct than STS, please input a valid ARN" super().__init__(self.message) class RoleArnParsingInvalidAccountID(Exception): def __init__(self): - self.message = "The assumed role arn contains a value for account id empty or invalid, a valid account id must be composed of 12 numbers, please input a valid arn" + self.message = "The assumed role ARN contains a value for account id empty or invalid, a valid account id must be composed of 12 numbers, please input a valid ARN" super().__init__(self.message) class RoleArnParsingInvalidResourceType(Exception): def __init__(self): - self.message = "The assumed role arn contains a value for resource type different than role, please input a valid arn" + self.message = "The assumed role ARN contains a value for resource type different than role, please input a valid ARN" super().__init__(self.message) class RoleArnParsingEmptyResource(Exception): def __init__(self): - self.message = "The assumed role arn does not contain a value for resource, please input a valid arn" + self.message = "The assumed role ARN does not contain a value for resource, please input a valid ARN" super().__init__(self.message) diff --git a/prowler/providers/aws/lib/arn/models.py b/prowler/providers/aws/lib/arn/models.py new file mode 100644 index 00000000..9c3a2de5 --- /dev/null +++ b/prowler/providers/aws/lib/arn/models.py @@ -0,0 +1,57 @@ +from typing import Optional + +from pydantic import BaseModel + +from prowler.providers.aws.lib.arn.error import RoleArnParsingFailedMissingFields + + +class ARN(BaseModel): + partition: str + service: str + region: Optional[str] # In IAM ARN's do not have region + account_id: str + resource: str + resource_type: str + + def __init__(self, arn): + # Validate the ARN + ## Check that arn starts with arn + if not arn.startswith("arn:"): + raise RoleArnParsingFailedMissingFields + ## Retrieve fields + arn_elements = arn.split(":", 5) + data = { + "partition": arn_elements[1], + "service": arn_elements[2], + "region": arn_elements[3] if arn_elements[3] != "" else None, + "account_id": arn_elements[4], + "resource": arn_elements[5], + "resource_type": get_arn_resource_type(arn, arn_elements[2]), + } + if "/" in data["resource"]: + data["resource"] = data["resource"].split("/", 1)[1] + elif ":" in data["resource"]: + data["resource"] = data["resource"].split(":", 1)[1] + + # Calls Pydantic's BaseModel __init__ + super().__init__(**data) + + +def get_arn_resource_type(arn, service): + if service == "s3": + resource_type = "bucket" + elif service == "sns": + resource_type = "topic" + elif service == "sqs": + resource_type = "queue" + elif service == "apigateway": + split_parts = arn.split(":")[5].split("/") + if "integration" in split_parts and "responses" in split_parts: + resource_type = "restapis-resources-methods-integration-response" + elif "documentation" in split_parts and "parts" in split_parts: + resource_type = "restapis-documentation-parts" + else: + resource_type = arn.split(":")[5].split("/")[1] + else: + resource_type = arn.split(":")[5].split("/")[0] + return resource_type diff --git a/prowler/providers/aws/lib/credentials/__init__.py b/prowler/providers/aws/lib/credentials/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/lib/credentials/credentials.py b/prowler/providers/aws/lib/credentials/credentials.py new file mode 100644 index 00000000..e9539122 --- /dev/null +++ b/prowler/providers/aws/lib/credentials/credentials.py @@ -0,0 +1,59 @@ +import sys + +from boto3 import session +from colorama import Fore, Style + +from prowler.lib.logger import logger +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_STS_GLOBAL_ENDPOINT_REGION = "us-east-1" + + +def validate_aws_credentials(session: session, input_regions: list) -> dict: + try: + # For a valid STS GetCallerIdentity we have to use the right AWS Region + if input_regions is None or len(input_regions) == 0: + if session.region_name is not None: + aws_region = session.region_name + else: + # If there is no region set passed with -f/--region + # we use the Global STS Endpoint Region, us-east-1 + aws_region = AWS_STS_GLOBAL_ENDPOINT_REGION + else: + # Get the first region passed to the -f/--region + aws_region = input_regions[0] + validate_credentials_client = session.client("sts", aws_region) + caller_identity = validate_credentials_client.get_caller_identity() + # Include the region where the caller_identity has validated the credentials + caller_identity["region"] = aws_region + except Exception as error: + logger.critical( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + sys.exit(1) + else: + return caller_identity + + +def print_aws_credentials(audit_info: AWS_Audit_Info): + # Beautify audited regions, set "all" if there is no filter region + regions = ( + ", ".join(audit_info.audited_regions) + if audit_info.audited_regions is not None + else "all" + ) + # Beautify audited profile, set "default" if there is no profile set + profile = audit_info.profile if audit_info.profile is not None else "default" + + report = f""" +This report is being generated using credentials below: + +AWS-CLI Profile: {Fore.YELLOW}[{profile}]{Style.RESET_ALL} AWS Filter Region: {Fore.YELLOW}[{regions}]{Style.RESET_ALL} +AWS Account: {Fore.YELLOW}[{audit_info.audited_account}]{Style.RESET_ALL} UserId: {Fore.YELLOW}[{audit_info.audited_user_id}]{Style.RESET_ALL} +Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESET_ALL} +""" + # If -A is set, print Assumed Role ARN + if audit_info.assumed_role_info.role_arn is not None: + report += f"""Assumed Role ARN: {Fore.YELLOW}[{audit_info.assumed_role_info.role_arn}]{Style.RESET_ALL} +""" + print(report) diff --git a/prowler/providers/aws/lib/organizations/__init__.py b/prowler/providers/aws/lib/organizations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/lib/organizations/organizations.py b/prowler/providers/aws/lib/organizations/organizations.py new file mode 100644 index 00000000..fc5f13a7 --- /dev/null +++ b/prowler/providers/aws/lib/organizations/organizations.py @@ -0,0 +1,40 @@ +import sys + +from boto3 import client + +from prowler.lib.logger import logger +from prowler.providers.aws.lib.audit_info.models import AWS_Organizations_Info + + +def get_organizations_metadata( + metadata_account: str, assumed_credentials: dict +) -> AWS_Organizations_Info: + try: + organizations_client = client( + "organizations", + aws_access_key_id=assumed_credentials["Credentials"]["AccessKeyId"], + aws_secret_access_key=assumed_credentials["Credentials"]["SecretAccessKey"], + aws_session_token=assumed_credentials["Credentials"]["SessionToken"], + ) + organizations_metadata = organizations_client.describe_account( + AccountId=metadata_account + ) + list_tags_for_resource = organizations_client.list_tags_for_resource( + ResourceId=metadata_account + ) + except Exception as error: + logger.critical(f"{error.__class__.__name__} -- {error}") + sys.exit(1) + else: + # Convert Tags dictionary to String + account_details_tags = "" + for tag in list_tags_for_resource["Tags"]: + account_details_tags += tag["Key"] + ":" + tag["Value"] + "," + organizations_info = AWS_Organizations_Info( + account_details_email=organizations_metadata["Account"]["Email"], + account_details_name=organizations_metadata["Account"]["Name"], + account_details_arn=organizations_metadata["Account"]["Arn"], + account_details_org=organizations_metadata["Account"]["Arn"].split("/")[1], + account_details_tags=account_details_tags, + ) + return organizations_info diff --git a/prowler/providers/aws/lib/quick_inventory/quick_inventory.py b/prowler/providers/aws/lib/quick_inventory/quick_inventory.py index 391c9be3..b87c8cce 100644 --- a/prowler/providers/aws/lib/quick_inventory/quick_inventory.py +++ b/prowler/providers/aws/lib/quick_inventory/quick_inventory.py @@ -14,6 +14,7 @@ from prowler.config.config import ( output_file_timestamp, ) from prowler.lib.logger import logger +from prowler.providers.aws.lib.arn.models import get_arn_resource_type from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info @@ -153,22 +154,8 @@ def create_inventory_table(resources: list, resources_in_region: dict) -> dict: services[service] = 0 services[service] += 1 - if service == "s3": - resource_type = "bucket" - elif service == "sns": - resource_type = "topic" - elif service == "sqs": - resource_type = "queue" - elif service == "apigateway": - split_parts = resource["arn"].split(":")[5].split("/") - if "integration" in split_parts and "responses" in split_parts: - resource_type = "restapis-resources-methods-integration-response" - elif "documentation" in split_parts and "parts" in split_parts: - resource_type = "restapis-documentation-parts" - else: - resource_type = resource["arn"].split(":")[5].split("/")[1] - else: - resource_type = resource["arn"].split(":")[5].split("/")[0] + resource_type = get_arn_resource_type(resource["arn"], service) + if service not in resources_type: resources_type[service] = {} if resource_type not in resources_type[service]: diff --git a/prowler/providers/common/audit_info.py b/prowler/providers/common/audit_info.py index cd4f8b82..c1db60f3 100644 --- a/prowler/providers/common/audit_info.py +++ b/prowler/providers/common/audit_info.py @@ -1,7 +1,5 @@ import sys -from arnparse import arnparse -from boto3 import client, session from botocore.config import Config from colorama import Fore, Style @@ -12,12 +10,15 @@ from prowler.providers.aws.aws_provider import ( get_checks_from_input_arn, get_regions_from_audit_resources, ) -from prowler.providers.aws.lib.arn.arn import arn_parsing +from prowler.providers.aws.lib.arn.arn import parse_iam_credentials_arn from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info -from prowler.providers.aws.lib.audit_info.models import ( - AWS_Audit_Info, - AWS_Credentials, - AWS_Organizations_Info, +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info, AWS_Credentials +from prowler.providers.aws.lib.credentials.credentials import ( + print_aws_credentials, + validate_aws_credentials, +) +from prowler.providers.aws.lib.organizations.organizations import ( + get_organizations_metadata, ) from prowler.providers.aws.lib.resource_api_tagging.resource_api_tagging import ( get_tagged_resources, @@ -34,39 +35,6 @@ class Audit_Info: def __init__(self): logger.info("Setting Audit Info ...") - def validate_credentials(self, validate_session: session) -> dict: - try: - validate_credentials_client = validate_session.client("sts") - caller_identity = validate_credentials_client.get_caller_identity() - except Exception as error: - logger.critical(f"{error.__class__.__name__} -- {error}") - sys.exit(1) - else: - return caller_identity - - def print_aws_credentials(self, audit_info: AWS_Audit_Info): - # Beautify audited regions, set "all" if there is no filter region - regions = ( - ", ".join(audit_info.audited_regions) - if audit_info.audited_regions is not None - else "all" - ) - # Beautify audited profile, set "default" if there is no profile set - profile = audit_info.profile if audit_info.profile is not None else "default" - - report = f""" -This report is being generated using credentials below: - -AWS-CLI Profile: {Fore.YELLOW}[{profile}]{Style.RESET_ALL} AWS Filter Region: {Fore.YELLOW}[{regions}]{Style.RESET_ALL} -AWS Account: {Fore.YELLOW}[{audit_info.audited_account}]{Style.RESET_ALL} UserId: {Fore.YELLOW}[{audit_info.audited_user_id}]{Style.RESET_ALL} -Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESET_ALL} -""" - # If -A is set, print Assumed Role ARN - if audit_info.assumed_role_info.role_arn is not None: - report += f"""Assumed Role ARN: {Fore.YELLOW}[{audit_info.assumed_role_info.role_arn}]{Style.RESET_ALL} -""" - print(report) - def print_gcp_credentials(self, audit_info: GCP_Audit_Info): # Beautify audited profile, set "default" if there is no profile set try: @@ -100,43 +68,6 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE """ print(report) - def get_organizations_metadata( - self, metadata_account: str, assumed_credentials: dict - ) -> AWS_Organizations_Info: - try: - organizations_client = client( - "organizations", - aws_access_key_id=assumed_credentials["Credentials"]["AccessKeyId"], - aws_secret_access_key=assumed_credentials["Credentials"][ - "SecretAccessKey" - ], - aws_session_token=assumed_credentials["Credentials"]["SessionToken"], - ) - organizations_metadata = organizations_client.describe_account( - AccountId=metadata_account - ) - list_tags_for_resource = organizations_client.list_tags_for_resource( - ResourceId=metadata_account - ) - except Exception as error: - logger.critical(f"{error.__class__.__name__} -- {error}") - sys.exit(1) - else: - # Convert Tags dictionary to String - account_details_tags = "" - for tag in list_tags_for_resource["Tags"]: - account_details_tags += tag["Key"] + ":" + tag["Value"] + "," - organizations_info = AWS_Organizations_Info( - account_details_email=organizations_metadata["Account"]["Email"], - account_details_name=organizations_metadata["Account"]["Name"], - account_details_arn=organizations_metadata["Account"]["Arn"], - account_details_org=organizations_metadata["Account"]["Arn"].split("/")[ - 1 - ], - account_details_tags=account_details_tags, - ) - return organizations_info - def set_aws_audit_info(self, arguments) -> AWS_Audit_Info: """ set_aws_audit_info returns the AWS_Audit_Info @@ -188,7 +119,9 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE current_audit_info.original_session = aws_provider.aws_session logger.info("Validating credentials ...") # Verificate if we have valid credentials - caller_identity = self.validate_credentials(current_audit_info.original_session) + caller_identity = validate_aws_credentials( + current_audit_info.original_session, input_regions + ) logger.info("Credentials validated") logger.info(f"Original caller identity UserId: {caller_identity['UserId']}") @@ -197,7 +130,7 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE current_audit_info.audited_account = caller_identity["Account"] current_audit_info.audited_identity_arn = caller_identity["Arn"] current_audit_info.audited_user_id = caller_identity["UserId"] - current_audit_info.audited_partition = arnparse( + current_audit_info.audited_partition = parse_iam_credentials_arn( caller_identity["Arn"] ).partition @@ -210,8 +143,8 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE # Check if role arn is valid try: - # this returns the arn already parsed, calls arnparse, into a dict to be used when it is needed to access its fields - role_arn_parsed = arn_parsing( + # this returns the arn already parsed into a dict to be used when it is needed to access its fields + role_arn_parsed = parse_iam_credentials_arn( current_audit_info.assumed_role_info.role_arn ) @@ -226,10 +159,8 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE assumed_credentials = assume_role( aws_provider.aws_session, aws_provider.role_info ) - current_audit_info.organizations_metadata = ( - self.get_organizations_metadata( - current_audit_info.audited_account, assumed_credentials - ) + current_audit_info.organizations_metadata = get_organizations_metadata( + current_audit_info.audited_account, assumed_credentials ) logger.info("Organizations metadata retrieved") @@ -243,8 +174,8 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE # Check if role arn is valid try: - # this returns the arn already parsed, calls arnparse, into a dict to be used when it is needed to access its fields - role_arn_parsed = arn_parsing( + # this returns the arn already parsed into a dict to be used when it is needed to access its fields + role_arn_parsed = parse_iam_credentials_arn( current_audit_info.assumed_role_info.role_arn ) @@ -293,7 +224,7 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE current_audit_info.profile_region = "us-east-1" if not arguments.get("only_logs"): - self.print_aws_credentials(current_audit_info) + print_aws_credentials(current_audit_info) # Parse Scan Tags if arguments.get("resource_tags"): diff --git a/pyproject.toml b/pyproject.toml index 8ebb1f5b..c541039a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,6 @@ version = "3.5.1" [tool.poetry.dependencies] alive-progress = "3.1.1" -arnparse = "0.0.2" azure-identity = "1.12.0" azure-mgmt-authorization = "3.0.0" azure-mgmt-security = "5.0.0" diff --git a/tests/providers/aws/lib/arn/arn_test.py b/tests/providers/aws/lib/arn/arn_test.py index 70df8e41..ebaaed09 100644 --- a/tests/providers/aws/lib/arn/arn_test.py +++ b/tests/providers/aws/lib/arn/arn_test.py @@ -1,30 +1,255 @@ import sure # noqa +from pytest import raises -from prowler.providers.aws.lib.arn.arn import arn_parsing, is_valid_arn +from prowler.providers.aws.lib.arn.arn import is_valid_arn, parse_iam_credentials_arn +from prowler.providers.aws.lib.arn.error import ( + RoleArnParsingEmptyResource, + RoleArnParsingFailedMissingFields, + RoleArnParsingIAMRegionNotEmpty, + RoleArnParsingInvalidAccountID, + RoleArnParsingInvalidResourceType, + RoleArnParsingPartitionEmpty, + RoleArnParsingServiceNotIAMnorSTS, +) +from prowler.providers.aws.lib.arn.models import ARN ACCOUNT_ID = "123456789012" -RESOURCE_TYPE = "role" +RESOURCE_TYPE_ROLE = "role" +RESOUCE_TYPE_USER = "user" IAM_ROLE = "test-role" +IAM_SERVICE = "iam" +COMMERCIAL_PARTITION = "aws" +CHINA_PARTITION = "aws-cn" +GOVCLOUD_PARTITION = "aws-us-gov" class Test_ARN_Parsing: - def test_arn_parsing(self): + def test_ARN_model(self): + # https://gist.github.com/cmawhorter/80bf94f12bf7516d50a7d61ed28859d3 + test_cases = [ + "arn:aws:elasticbeanstalk:us-east-1:123456789012:environment/My App/MyEnvironment", + "arn:aws:iam::123456789012:user/David", + "arn:aws:rds:eu-west-1:123456789012:db:mysql-db", + "arn:aws:s3:::my_corporate_bucket/exampleobject.png", + "arn:aws:artifact:::report-package/Certifications and Attestations/SOC/*", + "arn:aws:artifact:::report-package/Certifications and Attestations/ISO/*", + "arn:aws:artifact:::report-package/Certifications and Attestations/PCI/*", + # "arn:aws:autoscaling:us-east-1:123456789012:scalingPolicy:c7a27f55-d35e-4153-b044-8ca9155fc467:autoScalingGroupName/my-test-asg1:policyName/my-scaleout-policy", + "arn:aws:acm:us-east-1:123456789012:certificate/12345678-1234-1234-1234-123456789012", + "arn:aws:cloudformation:us-east-1:123456789012:stack/MyProductionStack/abc9dbf0-43c2-11e3-a6e8-50fa526be49c", + "arn:aws:cloudformation:us-east-1:123456789012:changeSet/MyProductionChangeSet/abc9dbf0-43c2-11e3-a6e8-50fa526be49c", + "arn:aws:cloudsearch:us-east-1:123456789012:domain/imdb-movies", + "arn:aws:cloudtrail:us-east-1:123456789012:trail/mytrailname", + "arn:aws:events:us-east-1:*:*", + "arn:aws:events:us-east-1:account-id:*", + "arn:aws:events:us-east-1:account-id:rule/rule_name", + "arn:aws:logs:us-east-1:*:*", + "arn:aws:logs:us-east-1:account-id:*", + "arn:aws:logs:us-east-1:account-id:log-group:log_group_name", + "arn:aws:logs:us-east-1:account-id:log-group:log_group_name:*", + "arn:aws:logs:us-east-1:account-id:log-group:log_group_name_prefix*", + "arn:aws:logs:us-east-1:account-id:log-group:log_group_name:log-stream:log_stream_name", + "arn:aws:logs:us-east-1:account-id:log-group:log_group_name:log-stream:log_stream_name_prefix*", + "arn:aws:logs:us-east-1:account-id:log-group:log_group_name_prefix*:log-stream:log_stream_name_prefix*", + "arn:aws:codebuild:us-east-1:123456789012:project/my-demo-project", + "arn:aws:codebuild:us-east-1:123456789012:build/my-demo-project:7b7416ae-89b4-46cc-8236-61129df660ad", + "arn:aws:codecommit:us-east-1:123456789012:MyDemoRepo", + "arn:aws:codedeploy:us-east-1:123456789012:application:WordPress_App", + "arn:aws:codedeploy:us-east-1:123456789012:instance/AssetTag*", + "arn:aws:config:us-east-1:123456789012:config-rule/MyConfigRule", + "arn:aws:codepipeline:us-east-1:123456789012:MyDemoPipeline", + "arn:aws:directconnect:us-east-1:123456789012:dxcon/dxcon-fgase048", + "arn:aws:directconnect:us-east-1:123456789012:dxvif/dxvif-fgrb110x", + "arn:aws:dynamodb:us-east-1:123456789012:table/books_table", + "arn:aws:ecr:us-east-1:123456789012:repository/my-repository", + "arn:aws:ecs:us-east-1:123456789012:cluster/my-cluster", + "arn:aws:ecs:us-east-1:123456789012:container-instance/403125b0-555c-4473-86b5-65982db28a6d", + "arn:aws:ecs:us-east-1:123456789012:task-definition/hello_world:8", + "arn:aws:ecs:us-east-1:123456789012:service/sample-webapp", + "arn:aws:ecs:us-east-1:123456789012:task/1abf0f6d-a411-4033-b8eb-a4eed3ad252a", + "arn:aws:ecs:us-east-1:123456789012:container/476e7c41-17f2-4c17-9d14-412566202c8a", + "arn:aws:ec2:us-east-1:123456789012:dedicated-host/h-12345678", + "arn:aws:ec2:us-east-1::image/ami-1a2b3c4d", + "arn:aws:ec2:us-east-1:123456789012:instance/*", + "arn:aws:ec2:us-east-1:123456789012:volume/*", + "arn:aws:ec2:us-east-1:123456789012:volume/vol-1a2b3c4d", + "arn:aws:elasticbeanstalk:us-east-1:123456789012:application/My App", + "arn:aws:elasticbeanstalk:us-east-1:123456789012:applicationversion/My App/My Version", + "arn:aws:elasticbeanstalk:us-east-1:123456789012:environment/My App/MyEnvironment", + "arn:aws:elasticbeanstalk:us-east-1::solutionstack/32bit Amazon Linux running Tomcat 7", + "arn:aws:elasticbeanstalk:us-east-1:123456789012:configurationtemplate/My App/My Template", + "arn:aws:elasticfilesystem:us-east-1:123456789012:file-system-id/fs12345678", + "arn:aws:elasticloadbalancing:us-east-1:123456789012:loadbalancer/app/my-load-balancer/50dc6c495c0c9188", + "arn:aws:elasticloadbalancing:us-east-1:123456789012:listener/app/my-load-balancer/50dc6c495c0c9188/f2f7dc8efc522ab2", + "arn:aws:elasticloadbalancing:us-east-1:123456789012:listener-rule/app/my-load-balancer/50dc6c495c0c9188/f2f7dc8efc522ab2/9683b2d02a6cabee", + "arn:aws:elasticloadbalancing:us-east-1:123456789012:targetgroup/my-targets/73e2d6bc24d8a067", + "arn:aws:elasticloadbalancing:us-east-1:123456789012:loadbalancer/my-load-balancer", + "arn:aws:elastictranscoder:us-east-1:123456789012:preset/*", + "arn:aws:elasticache:us-west-2:123456789012:cluster:myCluster", + "arn:aws:elasticache:us-west-2:123456789012:snapshot:mySnapshot", + "arn:aws:es:us-east-1:123456789012:domain/streaming-logs", + "arn:aws:glacier:us-east-1:123456789012:vaults/examplevault", + "arn:aws:glacier:us-east-1:123456789012:vaults/example*", + "arn:aws:glacier:us-east-1:123456789012:vaults/*", + "arn:aws:health:us-east-1::event/AWS_EC2_EXAMPLE_ID", + "arn:aws:health:us-east-1:123456789012:entity/AVh5GGT7ul1arKr1sE1K", + "arn:aws:iam::123456789012:root", + "arn:aws:iam::123456789012:user/Bob", + "arn:aws:iam::123456789012:user/division_abc/subdivision_xyz/Bob", + "arn:aws:iam::123456789012:group/Developers", + "arn:aws:iam::123456789012:group/division_abc/subdivision_xyz/product_A/Developers", + "arn:aws:iam::123456789012:role/S3Access", + "arn:aws:iam::123456789012:role/application_abc/component_xyz/S3Access", + "arn:aws:iam::123456789012:policy/UsersManageOwnCredentials", + "arn:aws:iam::123456789012:policy/division_abc/subdivision_xyz/UsersManageOwnCredentials", + "arn:aws:iam::123456789012:instance-profile/Webserver", + "arn:aws:sts::123456789012:federated-user/Bob", + "arn:aws:sts::123456789012:assumed-role/Accounting-Role/Mary", + "arn:aws:iam::123456789012:mfa/BobJonesMFA", + "arn:aws:iam::123456789012:server-certificate/ProdServerCert", + "arn:aws:iam::123456789012:server-certificate/division_abc/subdivision_xyz/ProdServerCert", + "arn:aws:iam::123456789012:saml-provider/ADFSProvider", + "arn:aws:iam::123456789012:oidc-provider/GoogleProvider", + "arn:aws:iot:your-region:123456789012:cert/123a456b789c123d456e789f123a456b789c123d456e789f123a456b789c123c456d7", + "arn:aws:iot::123456789012:policy/MyIoTPolicy", + "arn:aws:iot:your-region:123456789012:rule/MyIoTRule", + "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012", + "arn:aws:kms:us-east-1:123456789012:alias/example-alias", + "arn:aws:firehose:us-east-1:123456789012:deliverystream/example-stream-name", + "arn:aws:kinesis:us-east-1:123456789012:stream/example-stream-name", + "arn:aws:lambda:us-east-1:123456789012:function:ProcessKinesisRecords", + "arn:aws:lambda:us-east-1:123456789012:function:ProcessKinesisRecords:your alias", + "arn:aws:lambda:us-east-1:123456789012:function:ProcessKinesisRecords:1.0", + "arn:aws:lambda:us-east-1:123456789012:event-source-mappings:kinesis-stream-arn", + "arn:aws:machinelearning:us-east-1:123456789012:datasource/my-datasource-1", + "arn:aws:machinelearning:us-east-1:123456789012:mlmodel/my-mlmodel", + "arn:aws:machinelearning:us-east-1:123456789012:batchprediction/my-batchprediction", + "arn:aws:machinelearning:us-east-1:123456789012:evaluation/my-evaluation", + "arn:aws:polly:us-east-1:123456789012:lexicon/myLexicon", + "arn:aws:redshift:us-east-1:123456789012:cluster:my-cluster", + "arn:aws:redshift:us-east-1:123456789012:my-cluster/my-dbuser-name", + "arn:aws:redshift:us-east-1:123456789012:parametergroup:my-parameter-group", + "arn:aws:redshift:us-east-1:123456789012:securitygroup:my-public-group", + "arn:aws:redshift:us-east-1:123456789012:snapshot:my-cluster/my-snapshot20130807", + "arn:aws:redshift:us-east-1:123456789012:subnetgroup:my-subnet-10 ", + "arn:aws:rds:us-east-1:123456789012:db:mysql-db-instance1", + "arn:aws:rds:us-east-1:123456789012:snapshot:my-snapshot2", + "arn:aws:rds:us-east-1:123456789012:cluster:my-cluster1", + "arn:aws:rds:us-east-1:123456789012:cluster-snapshot:cluster1-snapshot7", + "arn:aws:rds:us-east-1:123456789012:og:mysql-option-group1", + "arn:aws:rds:us-east-1:123456789012:pg:mysql-repl-pg1", + "arn:aws:rds:us-east-1:123456789012:cluster-pg:aurora-pg3", + "arn:aws:rds:us-east-1:123456789012:secgrp:dev-secgrp2", + "arn:aws:rds:us-east-1:123456789012:subgrp:prod-subgrp1", + "arn:aws:rds:us-east-1:123456789012:es:monitor-events2", + "arn:aws:route53:::hostedzone/Z148QEXAMPLE8V", + "arn:aws:route53:::change/C2RDJ5EXAMPLE2", + "arn:aws:route53:::change/*", + "arn:aws:ssm:us-east-1:123456789012:document/highAvailabilityServerSetup", + "arn:aws:sns:*:123456789012:my_corporate_topic", + "arn:aws:sns:us-east-1:123456789012:my_corporate_topic:02034b43-fefa-4e07-a5eb-3be56f8c54ce", + "arn:aws:sqs:us-east-1:123456789012:queue1", + "arn:aws:s3:::my_corporate_bucket", + "arn:aws:s3:::my_corporate_bucket/exampleobject.png", + "arn:aws:s3:::my_corporate_bucket/*", + "arn:aws:s3:::my_corporate_bucket/Development/*", + "arn:aws:swf:us-east-1:123456789012:/domain/department1", + "arn:aws:swf:*:123456789012:/domain/*", + "arn:aws:states:us-east-1:123456789012:activity:HelloActivity", + "arn:aws:states:us-east-1:123456789012:stateMachine:HelloStateMachine", + "arn:aws:states:us-east-1:123456789012:execution:HelloStateMachine:HelloStateMachineExecution", + "arn:aws:storagegateway:us-east-1:123456789012:gateway/sgw-12A3456B", + "arn:aws:storagegateway:us-east-1:123456789012:gateway/sgw-12A3456B/volume/vol-1122AABB", + "arn:aws:storagegateway:us-east-1:123456789012:tape/AMZNC8A26D", + "arn:aws:storagegateway:us-east-1:123456789012:gateway/sgw-12A3456B/target/iqn.1997-05.com.amazon:vol-1122AABB", + "arn:aws:storagegateway:us-east-1:123456789012:gateway/sgw-12A3456B/device/AMZN_SGW-FF22CCDD_TAPEDRIVE_00010", + "arn:aws:trustedadvisor:*:123456789012:checks/fault_tolerance/BueAdJ7NrP", + "arn:aws:waf::123456789012:rule/41b5b052-1e4a-426b-8149-3595be6342c2", + "arn:aws:waf::123456789012:webacl/3bffd3ed-fa2e-445e-869f-a6a7cf153fd3", + "arn:aws:waf::123456789012:ipset/3f74bd8c-f046-4970-a1a7-41aa52e05480", + "arn:aws:waf::123456789012:bytematchset/d131bc0b-57be-4536-af1d-4894fd28acc4", + "arn:aws:waf::123456789012:sqlinjectionset/2be79d6f-2f41-4c9b-8192-d719676873f0", + "arn:aws:waf::123456789012:changetoken/03ba2197-fc98-4ac0-a67d-5b839762b16b", + "arn:aws:iam::123456789012:user/Development/product_1234/*", + "arn:aws:s3:::my_corporate_bucket/*", + "arn:aws:s3:::my_corporate_bucket/Development/*", + ] + # For now we are only testing that the ARN library does not raise any exception with the above list of ARNs. + for arn in test_cases: + _ = ARN(arn) + + def test_iam_credentials_arn_parsing(self): test_cases = [ { - "input_arn": f"arn:aws:iam::{ACCOUNT_ID}:{RESOURCE_TYPE}/{IAM_ROLE}", + "input_arn": f"arn:aws:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOURCE_TYPE_ROLE}/{IAM_ROLE}", "expected": { - "partition": "aws", - "service": "iam", + "partition": COMMERCIAL_PARTITION, + "service": IAM_SERVICE, "region": None, "account_id": ACCOUNT_ID, - "resource_type": RESOURCE_TYPE, + "resource_type": RESOURCE_TYPE_ROLE, "resource": IAM_ROLE, }, - } + }, + { + "input_arn": f"arn:aws:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOUCE_TYPE_USER}/{IAM_ROLE}", + "expected": { + "partition": COMMERCIAL_PARTITION, + "service": IAM_SERVICE, + "region": None, + "account_id": ACCOUNT_ID, + "resource_type": RESOUCE_TYPE_USER, + "resource": IAM_ROLE, + }, + }, + { + "input_arn": f"arn:{CHINA_PARTITION}:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOURCE_TYPE_ROLE}/{IAM_ROLE}", + "expected": { + "partition": CHINA_PARTITION, + "service": IAM_SERVICE, + "region": None, + "account_id": ACCOUNT_ID, + "resource_type": RESOURCE_TYPE_ROLE, + "resource": IAM_ROLE, + }, + }, + { + "input_arn": f"arn:{CHINA_PARTITION}:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOUCE_TYPE_USER}/{IAM_ROLE}", + "expected": { + "partition": CHINA_PARTITION, + "service": IAM_SERVICE, + "region": None, + "account_id": ACCOUNT_ID, + "resource_type": RESOUCE_TYPE_USER, + "resource": IAM_ROLE, + }, + }, + { + "input_arn": f"arn:{GOVCLOUD_PARTITION}:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOURCE_TYPE_ROLE}/{IAM_ROLE}", + "expected": { + "partition": GOVCLOUD_PARTITION, + "service": IAM_SERVICE, + "region": None, + "account_id": ACCOUNT_ID, + "resource_type": RESOURCE_TYPE_ROLE, + "resource": IAM_ROLE, + }, + }, + { + "input_arn": f"arn:{GOVCLOUD_PARTITION}:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOUCE_TYPE_USER}/{IAM_ROLE}", + "expected": { + "partition": GOVCLOUD_PARTITION, + "service": IAM_SERVICE, + "region": None, + "account_id": ACCOUNT_ID, + "resource_type": RESOUCE_TYPE_USER, + "resource": IAM_ROLE, + }, + }, ] for test in test_cases: input_arn = test["input_arn"] - parsed_arn = arn_parsing(input_arn) + parsed_arn = parse_iam_credentials_arn(input_arn) parsed_arn.partition.should.equal(test["expected"]["partition"]) parsed_arn.service.should.equal(test["expected"]["service"]) parsed_arn.region.should.equal(test["expected"]["region"]) @@ -32,6 +257,59 @@ class Test_ARN_Parsing: parsed_arn.resource_type.should.equal(test["expected"]["resource_type"]) parsed_arn.resource.should.equal(test["expected"]["resource"]) + def test_iam_credentials_arn_parsing_raising_RoleArnParsingFailedMissingFields( + self, + ): + input_arn = "" + with raises(RoleArnParsingFailedMissingFields) as error: + parse_iam_credentials_arn(input_arn) + + assert error._excinfo[0] == RoleArnParsingFailedMissingFields + + def test_iam_credentials_arn_parsing_raising_RoleArnParsingIAMRegionNotEmpty(self): + input_arn = "arn:aws:iam:eu-west-1:111111111111:user/prowler" + with raises(RoleArnParsingIAMRegionNotEmpty) as error: + parse_iam_credentials_arn(input_arn) + + assert error._excinfo[0] == RoleArnParsingIAMRegionNotEmpty + + def test_iam_credentials_arn_parsing_raising_RoleArnParsingPartitionEmpty(self): + input_arn = "arn::iam::111111111111:user/prowler" + with raises(RoleArnParsingPartitionEmpty) as error: + parse_iam_credentials_arn(input_arn) + + assert error._excinfo[0] == RoleArnParsingPartitionEmpty + + def test_iam_credentials_arn_parsing_raising_RoleArnParsingServiceNotIAM(self): + input_arn = "arn:aws:s3::111111111111:user/prowler" + with raises(RoleArnParsingServiceNotIAMnorSTS) as error: + parse_iam_credentials_arn(input_arn) + + assert error._excinfo[0] == RoleArnParsingServiceNotIAMnorSTS + + def test_iam_credentials_arn_parsing_raising_RoleArnParsingInvalidAccountID(self): + input_arn = "arn:aws:iam::AWS_ACCOUNT_ID:user/prowler" + with raises(RoleArnParsingInvalidAccountID) as error: + parse_iam_credentials_arn(input_arn) + + assert error._excinfo[0] == RoleArnParsingInvalidAccountID + + def test_iam_credentials_arn_parsing_raising_RoleArnParsingInvalidResourceType( + self, + ): + input_arn = "arn:aws:iam::111111111111:account/prowler" + with raises(RoleArnParsingInvalidResourceType) as error: + parse_iam_credentials_arn(input_arn) + + assert error._excinfo[0] == RoleArnParsingInvalidResourceType + + def test_iam_credentials_arn_parsing_raising_RoleArnParsingEmptyResource(self): + input_arn = "arn:aws:iam::111111111111:role/" + with raises(RoleArnParsingEmptyResource) as error: + parse_iam_credentials_arn(input_arn) + + assert error._excinfo[0] == RoleArnParsingEmptyResource + def test_is_valid_arn(self): assert is_valid_arn("arn:aws:iam::012345678910:user/test") assert is_valid_arn("arn:aws-cn:ec2:us-east-1:123456789012:vpc/vpc-12345678") diff --git a/tests/providers/aws/lib/credentials/credentials_test.py b/tests/providers/aws/lib/credentials/credentials_test.py new file mode 100644 index 00000000..61abeaa4 --- /dev/null +++ b/tests/providers/aws/lib/credentials/credentials_test.py @@ -0,0 +1,312 @@ +import re + +import boto3 +import botocore +from mock import patch +from moto import mock_iam, mock_sts + +from prowler.providers.aws.lib.arn.arn import parse_iam_credentials_arn +from prowler.providers.aws.lib.credentials.credentials import validate_aws_credentials + +AWS_ACCOUNT_NUMBER = "123456789012" + + +# Mocking GetCallerIdentity for China and GovCloud +make_api_call = botocore.client.BaseClient._make_api_call + + +def mock_get_caller_identity_china(self, operation_name, kwarg): + if operation_name == "GetCallerIdentity": + return { + "UserId": "XXXXXXXXXXXXXXXXXXXXX", + "Account": AWS_ACCOUNT_NUMBER, + "Arn": f"arn:aws-cn:iam::{AWS_ACCOUNT_NUMBER}:user/test-user", + } + + return make_api_call(self, operation_name, kwarg) + + +def mock_get_caller_identity_gov_cloud(self, operation_name, kwarg): + if operation_name == "GetCallerIdentity": + return { + "UserId": "XXXXXXXXXXXXXXXXXXXXX", + "Account": AWS_ACCOUNT_NUMBER, + "Arn": f"arn:aws-us-gov:iam::{AWS_ACCOUNT_NUMBER}:user/test-user", + } + + return make_api_call(self, operation_name, kwarg) + + +class Test_AWS_Credentials: + @mock_sts + @mock_iam + def test_validate_credentials_commercial_partition_with_regions(self): + # AWS Region for AWS COMMERCIAL + aws_region = "eu-west-1" + aws_partition = "aws" + # Create a mock IAM user + iam_client = boto3.client("iam", region_name=aws_region) + iam_user = iam_client.create_user(UserName="test-user")["User"] + # Create a mock IAM access keys + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + + # Create AWS session to validate + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=aws_region, + ) + + get_caller_identity = validate_aws_credentials(session, [aws_region]) + + assert get_caller_identity["region"] == aws_region + + caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"]) + + assert caller_identity_arn.partition == aws_partition + assert caller_identity_arn.region is None + assert caller_identity_arn.resource == "test-user" + assert caller_identity_arn.resource_type == "user" + assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) + assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + + @mock_sts + @mock_iam + def test_validate_credentials_commercial_partition_with_regions_none_and_profile_region_so_profile_region( + self, + ): + # AWS Region for AWS COMMERCIAL + aws_region = "eu-west-1" + aws_partition = "aws" + # Create a mock IAM user + iam_client = boto3.client("iam", region_name=aws_region) + iam_user = iam_client.create_user(UserName="test-user")["User"] + # Create a mock IAM access keys + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + + # Create AWS session to validate + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=aws_region, + ) + + get_caller_identity = validate_aws_credentials(session, None) + + assert get_caller_identity["region"] == aws_region + + caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"]) + + assert caller_identity_arn.partition == aws_partition + assert caller_identity_arn.region is None + assert caller_identity_arn.resource == "test-user" + assert caller_identity_arn.resource_type == "user" + assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) + assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + + @mock_sts + @mock_iam + def test_validate_credentials_commercial_partition_with_0_regions_and_profile_region_so_profile_region( + self, + ): + # AWS Region for AWS COMMERCIAL + aws_region = "eu-west-1" + aws_partition = "aws" + # Create a mock IAM user + iam_client = boto3.client("iam", region_name=aws_region) + iam_user = iam_client.create_user(UserName="test-user")["User"] + # Create a mock IAM access keys + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + + # Create AWS session to validate + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=aws_region, + ) + + get_caller_identity = validate_aws_credentials(session, []) + + assert get_caller_identity["region"] == aws_region + + caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"]) + + assert caller_identity_arn.partition == aws_partition + assert caller_identity_arn.region is None + assert caller_identity_arn.resource == "test-user" + assert caller_identity_arn.resource_type == "user" + assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) + assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + + @mock_sts + @mock_iam + def test_validate_credentials_commercial_partition_without_regions_and_profile_region_so_us_east_1( + self, + ): + # AWS Region for AWS COMMERCIAL + aws_region = "eu-west-1" + aws_partition = "aws" + # Create a mock IAM user + iam_client = boto3.client("iam", region_name=aws_region) + iam_user = iam_client.create_user(UserName="test-user")["User"] + # Create a mock IAM access keys + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + + # Create AWS session to validate + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=None, + ) + + get_caller_identity = validate_aws_credentials(session, []) + + assert get_caller_identity["region"] == "us-east-1" + + caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"]) + + assert caller_identity_arn.partition == aws_partition + assert caller_identity_arn.region is None + assert caller_identity_arn.resource == "test-user" + assert caller_identity_arn.resource_type == "user" + assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) + assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + + @mock_sts + @mock_iam + def test_validate_credentials_china_partition_without_regions_and_profile_region_so_us_east_1( + self, + ): + # AWS Region for AWS COMMERCIAL + aws_region = "eu-west-1" + aws_partition = "aws" + # Create a mock IAM user + iam_client = boto3.client("iam", region_name=aws_region) + iam_user = iam_client.create_user(UserName="test-user")["User"] + # Create a mock IAM access keys + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + + # Create AWS session to validate + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=None, + ) + + get_caller_identity = validate_aws_credentials(session, []) + + assert get_caller_identity["region"] == "us-east-1" + + caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"]) + + assert caller_identity_arn.partition == aws_partition + assert caller_identity_arn.region is None + assert caller_identity_arn.resource == "test-user" + assert caller_identity_arn.resource_type == "user" + assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) + assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + + @mock_sts + @mock_iam + @patch( + "botocore.client.BaseClient._make_api_call", new=mock_get_caller_identity_china + ) + def test_validate_credentials_china_partition(self): + # AWS Region for AWS CHINA + aws_region = "cn-north-1" + aws_partition = "aws-cn" + # Create a mock IAM user + iam_client = boto3.client("iam", region_name=aws_region) + iam_user = iam_client.create_user(UserName="test-user")["User"] + # Create a mock IAM access keys + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + + # Create AWS session to validate + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=aws_region, + ) + + get_caller_identity = validate_aws_credentials(session, [aws_region]) + + # To use GovCloud or China it is either required: + # - Set the AWS profile region with a valid partition region + # - Use the -f/--region with a valid partition region + assert get_caller_identity["region"] == aws_region + + caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"]) + + assert caller_identity_arn.partition == aws_partition + assert caller_identity_arn.region is None + assert caller_identity_arn.resource == "test-user" + assert caller_identity_arn.resource_type == "user" + assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) + assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER + + @mock_sts + @mock_iam + @patch( + "botocore.client.BaseClient._make_api_call", + new=mock_get_caller_identity_gov_cloud, + ) + def test_validate_credentials_gov_cloud_partition(self): + # AWS Region for US GOV CLOUD + aws_region = "us-gov-east-1" + aws_partition = "aws-us-gov" + # Create a mock IAM user + iam_client = boto3.client("iam", region_name=aws_region) + iam_user = iam_client.create_user(UserName="test-user")["User"] + # Create a mock IAM access keys + access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ + "AccessKey" + ] + access_key_id = access_key["AccessKeyId"] + secret_access_key = access_key["SecretAccessKey"] + + # Create AWS session to validate + session = boto3.session.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + region_name=aws_region, + ) + + get_caller_identity = validate_aws_credentials(session, [aws_region]) + + # To use GovCloud or China it is either required: + # - Set the AWS profile region with a valid partition region + # - Use the -f/--region with a valid partition region + assert get_caller_identity["region"] == aws_region + + caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"]) + + assert caller_identity_arn.partition == aws_partition + assert caller_identity_arn.region is None + assert caller_identity_arn.resource == "test-user" + assert caller_identity_arn.resource_type == "user" + assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"]) + assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER diff --git a/tests/providers/aws/lib/organizations/organizations_test.py b/tests/providers/aws/lib/organizations/organizations_test.py new file mode 100644 index 00000000..78ecabd0 --- /dev/null +++ b/tests/providers/aws/lib/organizations/organizations_test.py @@ -0,0 +1,61 @@ +import json + +import boto3 +import sure # noqa +from moto import mock_iam, mock_organizations, mock_sts + +from prowler.providers.aws.lib.organizations.organizations import ( + get_organizations_metadata, +) + +AWS_ACCOUNT_NUMBER = "123456789012" + + +class Test_AWS_Organizations: + @mock_organizations + @mock_sts + @mock_iam + def test_organizations(self): + client = boto3.client("organizations", region_name="us-east-1") + iam_client = boto3.client("iam", region_name="us-east-1") + sts_client = boto3.client("sts", region_name="us-east-1") + + mockname = "mock-account" + mockdomain = "moto-example.org" + mockemail = "@".join([mockname, mockdomain]) + + org_id = client.create_organization(FeatureSet="ALL")["Organization"]["Id"] + account_id = client.create_account(AccountName=mockname, Email=mockemail)[ + "CreateAccountStatus" + ]["AccountId"] + + client.tag_resource( + ResourceId=account_id, Tags=[{"Key": "key", "Value": "value"}] + ) + + trust_policy_document = { + "Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Principal": {"AWS": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"}, + "Action": "sts:AssumeRole", + }, + } + iam_role_arn = iam_client.role_arn = iam_client.create_role( + RoleName="test-role", + AssumeRolePolicyDocument=json.dumps(trust_policy_document), + )["Role"]["Arn"] + session_name = "new-session" + assumed_role = sts_client.assume_role( + RoleArn=iam_role_arn, RoleSessionName=session_name + ) + + org = get_organizations_metadata(account_id, assumed_role) + + org.account_details_email.should.equal(mockemail) + org.account_details_name.should.equal(mockname) + org.account_details_arn.should.equal( + f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/{org_id}/{account_id}" + ) + org.account_details_org.should.equal(org_id) + org.account_details_tags.should.equal("key:value,") diff --git a/tests/providers/common/audit_info_test.py b/tests/providers/common/audit_info_test.py index fc1f52b2..93049aa8 100644 --- a/tests/providers/common/audit_info_test.py +++ b/tests/providers/common/audit_info_test.py @@ -1,15 +1,9 @@ -import json - import boto3 +import botocore import sure # noqa +from boto3 import session from mock import patch -from moto import ( - mock_ec2, - mock_iam, - mock_organizations, - mock_resourcegroupstaggingapi, - mock_sts, -) +from moto import mock_ec2, mock_resourcegroupstaggingapi from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info from prowler.providers.azure.azure_provider import Azure_Provider @@ -26,24 +20,8 @@ from prowler.providers.gcp.gcp_provider import GCP_Provider from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info EXAMPLE_AMI_ID = "ami-12c6146b" -ACCOUNT_ID = 123456789012 -mock_current_audit_info = AWS_Audit_Info( - session_config=None, - original_session=None, - audit_session=None, - audited_account="123456789012", - audited_identity_arn="arn:aws:iam::123456789012:user/test", - audited_user_id="test", - audited_partition="aws", - profile="default", - profile_region="eu-west-1", - credentials=None, - assumed_role_info=None, - audited_regions=["eu-west-2", "eu-west-1"], - organizations_metadata=None, - audit_resources=None, - audit_metadata=None, -) +AWS_ACCOUNT_NUMBER = "123456789012" + mock_azure_audit_info = Azure_Audit_Info( credentials=None, @@ -54,6 +32,31 @@ mock_azure_audit_info = Azure_Audit_Info( mock_set_audit_info = Audit_Info() +# Mocking GetCallerIdentity for China and GovCloud +make_api_call = botocore.client.BaseClient._make_api_call + + +def mock_get_caller_identity_china(self, operation_name, kwarg): + if operation_name == "GetCallerIdentity": + return { + "UserId": "XXXXXXXXXXXXXXXXXXXXX", + "Account": AWS_ACCOUNT_NUMBER, + "Arn": f"arn:aws-cn:iam::{AWS_ACCOUNT_NUMBER}:user/test-user", + } + + return make_api_call(self, operation_name, kwarg) + + +def mock_get_caller_identity_gov_cloud(self, operation_name, kwarg): + if operation_name == "GetCallerIdentity": + return { + "UserId": "XXXXXXXXXXXXXXXXXXXXX", + "Account": AWS_ACCOUNT_NUMBER, + "Arn": f"arn:aws-us-gov:iam::{AWS_ACCOUNT_NUMBER}:user/test-user", + } + + return make_api_call(self, operation_name, kwarg) + def mock_validate_credentials(*_): caller_identity = { @@ -81,118 +84,60 @@ def mock_set_gcp_credentials(*_): class Test_Set_Audit_Info: - @patch( - "prowler.providers.common.audit_info.current_audit_info", - new=mock_current_audit_info, - ) - @mock_sts - @mock_iam - def test_validate_credentials(self): - # Create a mock IAM user - iam_client = boto3.client("iam", region_name="us-east-1") - iam_user = iam_client.create_user(UserName="test-user")["User"] - # Create a mock IAM access keys - access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ - "AccessKey" - ] - access_key_id = access_key["AccessKeyId"] - secret_access_key = access_key["SecretAccessKey"] - # Create AWS session to validate - session = boto3.session.Session( - aws_access_key_id=access_key_id, - aws_secret_access_key=secret_access_key, - region_name="us-east-1", + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn="arn:aws:iam::123456789012:user/test", + profile=None, + profile_region="eu-west-1", + credentials=None, + assumed_role_info=None, + audited_regions=["eu-west-2", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, ) - audit_info = Audit_Info() - get_caller_identity = audit_info.validate_credentials(session) - get_caller_identity["Arn"].should.equal(iam_user["Arn"]) - get_caller_identity["UserId"].should.equal(iam_user["UserId"]) - # assert get_caller_identity["UserId"] == str(ACCOUNT_ID) + return audit_info @patch( - "prowler.providers.common.audit_info.current_audit_info", - new=mock_current_audit_info, + "prowler.providers.common.audit_info.validate_aws_credentials", + new=mock_validate_credentials, ) - @mock_organizations - @mock_sts - @mock_iam - def test_organizations(self): - client = boto3.client("organizations", region_name="us-east-1") - iam_client = boto3.client("iam", region_name="us-east-1") - sts_client = boto3.client("sts", region_name="us-east-1") - - mockname = "mock-account" - mockdomain = "moto-example.org" - mockemail = "@".join([mockname, mockdomain]) - - org_id = client.create_organization(FeatureSet="ALL")["Organization"]["Id"] - account_id = client.create_account(AccountName=mockname, Email=mockemail)[ - "CreateAccountStatus" - ]["AccountId"] - - client.tag_resource( - ResourceId=account_id, Tags=[{"Key": "key", "Value": "value"}] - ) - - trust_policy_document = { - "Version": "2012-10-17", - "Statement": { - "Effect": "Allow", - "Principal": { - "AWS": "arn:aws:iam::{account_id}:root".format( - account_id=ACCOUNT_ID - ) - }, - "Action": "sts:AssumeRole", - }, - } - iam_role_arn = iam_client.role_arn = iam_client.create_role( - RoleName="test-role", - AssumeRolePolicyDocument=json.dumps(trust_policy_document), - )["Role"]["Arn"] - session_name = "new-session" - assumed_role = sts_client.assume_role( - RoleArn=iam_role_arn, RoleSessionName=session_name - ) - - audit_info = Audit_Info() - org = audit_info.get_organizations_metadata(account_id, assumed_role) - - org.account_details_email.should.equal(mockemail) - org.account_details_name.should.equal(mockname) - org.account_details_arn.should.equal( - "arn:aws:organizations::{0}:account/{1}/{2}".format( - ACCOUNT_ID, org_id, account_id - ) - ) - org.account_details_org.should.equal(org_id) - org.account_details_tags.should.equal("key:value,") - @patch( - "prowler.providers.common.audit_info.current_audit_info", - new=mock_current_audit_info, + "prowler.providers.common.audit_info.print_aws_credentials", + new=mock_print_audit_credentials, ) - @patch.object(Audit_Info, "validate_credentials", new=mock_validate_credentials) - @patch.object(Audit_Info, "print_aws_credentials", new=mock_print_audit_credentials) def test_set_audit_info_aws(self): - provider = "aws" - arguments = { - "profile": None, - "role": None, - "session_duration": None, - "external_id": None, - "regions": None, - "organizations_role": None, - "subscriptions": None, - "az_cli_auth": None, - "sp_env_auth": None, - "browser_auth": None, - "managed_entity_auth": None, - } + with patch( + "prowler.providers.common.audit_info.current_audit_info", + new=self.set_mocked_audit_info(), + ): + provider = "aws" + arguments = { + "profile": None, + "role": None, + "session_duration": None, + "external_id": None, + "regions": None, + "organizations_role": None, + "subscriptions": None, + "az_cli_auth": None, + "sp_env_auth": None, + "browser_auth": None, + "managed_entity_auth": None, + } - audit_info = set_provider_audit_info(provider, arguments) - assert isinstance(audit_info, AWS_Audit_Info) + audit_info = set_provider_audit_info(provider, arguments) + assert isinstance(audit_info, AWS_Audit_Info) @patch( "prowler.providers.common.audit_info.azure_audit_info", @@ -242,45 +187,48 @@ class Test_Set_Audit_Info: @mock_resourcegroupstaggingapi @mock_ec2 def test_get_tagged_resources(self): - client = boto3.client("ec2", region_name="eu-central-1") - instances = client.run_instances( - ImageId=EXAMPLE_AMI_ID, - MinCount=1, - MaxCount=1, - InstanceType="t2.micro", - TagSpecifications=[ - { - "ResourceType": "instance", - "Tags": [ - {"Key": "MY_TAG1", "Value": "MY_VALUE1"}, - {"Key": "MY_TAG2", "Value": "MY_VALUE2"}, - ], - }, - { - "ResourceType": "instance", - "Tags": [{"Key": "ami", "Value": "test"}], - }, - ], - ) - instance_id = instances["Instances"][0]["InstanceId"] - image_id = client.create_image(Name="testami", InstanceId=instance_id)[ - "ImageId" - ] - client.create_tags(Resources=[image_id], Tags=[{"Key": "ami", "Value": "test"}]) + with patch( + "prowler.providers.common.audit_info.current_audit_info", + new=self.set_mocked_audit_info(), + ) as mock_audit_info: + client = boto3.client("ec2", region_name="eu-central-1") + instances = client.run_instances( + ImageId=EXAMPLE_AMI_ID, + MinCount=1, + MaxCount=1, + InstanceType="t2.micro", + TagSpecifications=[ + { + "ResourceType": "instance", + "Tags": [ + {"Key": "MY_TAG1", "Value": "MY_VALUE1"}, + {"Key": "MY_TAG2", "Value": "MY_VALUE2"}, + ], + }, + { + "ResourceType": "instance", + "Tags": [{"Key": "ami", "Value": "test"}], + }, + ], + ) + instance_id = instances["Instances"][0]["InstanceId"] + image_id = client.create_image(Name="testami", InstanceId=instance_id)[ + "ImageId" + ] + client.create_tags( + Resources=[image_id], Tags=[{"Key": "ami", "Value": "test"}] + ) - mock_current_audit_info.audited_regions = ["eu-central-1"] - mock_current_audit_info.audit_session = boto3.session.Session() - assert len(get_tagged_resources(["ami=test"], mock_current_audit_info)) == 2 - assert image_id in str( - get_tagged_resources(["ami=test"], mock_current_audit_info) - ) - assert instance_id in str( - get_tagged_resources(["ami=test"], mock_current_audit_info) - ) - assert ( - len(get_tagged_resources(["MY_TAG1=MY_VALUE1"], mock_current_audit_info)) - == 1 - ) - assert instance_id in str( - get_tagged_resources(["MY_TAG1=MY_VALUE1"], mock_current_audit_info) - ) + mock_audit_info.audited_regions = ["eu-central-1"] + mock_audit_info.audit_session = boto3.session.Session() + assert len(get_tagged_resources(["ami=test"], mock_audit_info)) == 2 + assert image_id in str(get_tagged_resources(["ami=test"], mock_audit_info)) + assert instance_id in str( + get_tagged_resources(["ami=test"], mock_audit_info) + ) + assert ( + len(get_tagged_resources(["MY_TAG1=MY_VALUE1"], mock_audit_info)) == 1 + ) + assert instance_id in str( + get_tagged_resources(["MY_TAG1=MY_VALUE1"], mock_audit_info) + )