fix(sts): Use the right region to validate credentials (#2349)

Co-authored-by: Sergio Garcia <sergargar1@gmail.com>
This commit is contained in:
Pepe Fagoaga
2023-05-18 15:51:57 +02:00
committed by GitHub
parent 0bd26b19d7
commit e84f5f184e
18 changed files with 1089 additions and 345 deletions

View File

@@ -0,0 +1,79 @@
# AWS Regions and Partitions
By default Prowler is able to scan the following AWS partitions:
- Commercial: `aws`
- China: `aws-cn`
- GovCloud (US): `aws-us-gov`
> To check the available regions for each partition and service please refer to the following document [aws_regions_by_service.json](https://github.com/prowler-cloud/prowler/blob/master/prowler/providers/aws/aws_regions_by_service.json)
It is important to take into consideration that to scan the China (`aws-cn`) or GovCloud (`aws-us-gov`) partitions it is either required to have a valid region for that partition in your AWS credentials (Refer to https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials for more information) or to specify the regions you want to audit for that partition using the `-f/--region` flag.
You can get more information about the available partitions and regions in the following [Botocore](https://github.com/boto/botocore) file https://github.com/boto/botocore/blob/22a19ea7c4c2c4dd7df4ab8c32733cba0c7597a4/botocore/data/partitions.json
## AWS China
To scan your AWS Account in the China partition (`aws-cn`):
- Using the `-f/--region` flag:
```
prowler aws --region cn-north-1 cn-northwest-1
```
- Using the region configured in your AWS profile at `~/.aws/credentials` or `~/.aws/config`:
```
[default]
aws_access_key_id = XXXXXXXXXXXXXXXXXXX
aws_secret_access_key = XXXXXXXXXXXXXXXXXXX
region = cn-north-1
```
> With this option all the partition regions will be scanned without the need of use the `-f/--region` flag
## AWS GovCloud (US)
To scan your AWS Account in the GovCloud (US) partition (`aws-us-gov`):
- Using the `-f/--region` flag:
```
prowler aws --region us-gov-east-1 us-gov-west-1
```
- Using the region configured in your AWS profile at `~/.aws/credentials` or `~/.aws/config`:
```
[default]
aws_access_key_id = XXXXXXXXXXXXXXXXXXX
aws_secret_access_key = XXXXXXXXXXXXXXXXXXX
region = us-gov-east-1
```
> With this option all the partition regions will be scanned without the need of use the `-f/--region` flag
## AWS ISO (US & Europe)
For the AWS ISO partitions, which are known as "secret partitions" and are air-gapped from the internet there is no builtin way to scanned it. In this scenario if you want to audit an AWS Account in one of the AWS ISO partitions you should manually update the [aws_regions_by_service.json](https://github.com/prowler-cloud/prowler/blob/master/prowler/providers/aws/aws_regions_by_service.json) and include the partition, region and services, e.g.:
```json
"iam": {
"regions": {
"aws": [
"eu-west-1",
"us-east-1",
],
"aws-cn": [
"cn-north-1",
"cn-northwest-1"
],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
],
"aws-iso": [
"aws-iso-global",
"us-iso-east-1",
"us-iso-west-1"
],
"aws-iso-b": [
"aws-iso-b-global",
"us-isob-east-1"
],
"aws-iso-e": [],
}
},
```

View File

@@ -43,6 +43,7 @@ nav:
- Assume Role: tutorials/aws/role-assumption.md - Assume Role: tutorials/aws/role-assumption.md
- AWS Security Hub: tutorials/aws/securityhub.md - AWS Security Hub: tutorials/aws/securityhub.md
- AWS Organizations: tutorials/aws/organizations.md - AWS Organizations: tutorials/aws/organizations.md
- AWS Regions and Partitions: tutorials/aws/regions-and-partitions.md
- Scan Multiple AWS Accounts: tutorials/aws/multiaccount.md - Scan Multiple AWS Accounts: tutorials/aws/multiaccount.md
- AWS CloudShell: tutorials/aws/cloudshell.md - AWS CloudShell: tutorials/aws/cloudshell.md
- Checks v2 to v3 Mapping: tutorials/aws/v2_to_v3_checks_mapping.md - Checks v2 to v3 Mapping: tutorials/aws/v2_to_v3_checks_mapping.md

14
poetry.lock generated
View File

@@ -28,18 +28,6 @@ files = [
about-time = "4.2.1" about-time = "4.2.1"
grapheme = "0.6.0" grapheme = "0.6.0"
[[package]]
name = "arnparse"
version = "0.0.2"
description = "Parse ARNs using Python"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "arnparse-0.0.2-py2.py3-none-any.whl", hash = "sha256:b0906734e4b8f19e39b1e32944c6cd6274b6da90c066a83882ac7a11d27553e0"},
{file = "arnparse-0.0.2.tar.gz", hash = "sha256:cb87f17200d07121108a9085d4a09cc69a55582647776b9a917b0b1f279db8f8"},
]
[[package]] [[package]]
name = "astroid" name = "astroid"
version = "2.15.4" version = "2.15.4"
@@ -2887,4 +2875,4 @@ docs = ["mkdocs", "mkdocs-material"]
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.9" python-versions = "^3.9"
content-hash = "d79eb4bb147bb8298a228002aa332ff8c8e380702eb81bf8012586c899890bfc" content-hash = "bf40bfed6a88bde379337be45f5be5dd468ea3c7c79ff4ff92448ead3fe79308"

View File

@@ -36,7 +36,7 @@ class AWS_Provider:
secret_key=audit_info.credentials.aws_secret_access_key, secret_key=audit_info.credentials.aws_secret_access_key,
token=audit_info.credentials.aws_session_token, token=audit_info.credentials.aws_session_token,
expiry_time=audit_info.credentials.expiration, expiry_time=audit_info.credentials.expiration,
refresh_using=self.refresh, refresh_using=self.refresh_credentials,
method="sts-assume-role", method="sts-assume-role",
) )
# Here we need the botocore session since it needs to use refreshable credentials # Here we need the botocore session since it needs to use refreshable credentials
@@ -60,7 +60,7 @@ class AWS_Provider:
# Refresh credentials method using assume role # Refresh credentials method using assume role
# This method is called "adding ()" to the name, so it cannot accept arguments # This method is called "adding ()" to the name, so it cannot accept arguments
# https://github.com/boto/botocore/blob/098cc255f81a25b852e1ecdeb7adebd94c7b1b73/botocore/credentials.py#L570 # https://github.com/boto/botocore/blob/098cc255f81a25b852e1ecdeb7adebd94c7b1b73/botocore/credentials.py#L570
def refresh(self): def refresh_credentials(self):
logger.info("Refreshing assumed credentials...") logger.info("Refreshing assumed credentials...")
response = assume_role(self.aws_session, self.role_info) response = assume_role(self.aws_session, self.role_info)

View File

@@ -1,50 +1,48 @@
import re import re
from arnparse import arnparse
from prowler.providers.aws.lib.arn.error import ( from prowler.providers.aws.lib.arn.error import (
RoleArnParsingEmptyResource, RoleArnParsingEmptyResource,
RoleArnParsingFailedMissingFields,
RoleArnParsingIAMRegionNotEmpty, RoleArnParsingIAMRegionNotEmpty,
RoleArnParsingInvalidAccountID, RoleArnParsingInvalidAccountID,
RoleArnParsingInvalidResourceType, RoleArnParsingInvalidResourceType,
RoleArnParsingPartitionEmpty, RoleArnParsingPartitionEmpty,
RoleArnParsingServiceNotIAM, RoleArnParsingServiceNotIAMnorSTS,
) )
from prowler.providers.aws.lib.arn.models import ARN
def arn_parsing(arn): def parse_iam_credentials_arn(arn: str) -> ARN:
# check for number of fields, must be six arn_parsed = ARN(arn)
if len(arn.split(":")) != 6: # First check if region is empty (in IAM ARN's region is always empty)
raise RoleArnParsingFailedMissingFields if arn_parsed.region:
raise RoleArnParsingIAMRegionNotEmpty
else: else:
arn_parsed = arnparse(arn) # check if needed fields are filled:
# First check if region is empty (in IAM arns region is always empty) # - partition
if arn_parsed.region is not None: # - service
raise RoleArnParsingIAMRegionNotEmpty # - account_id
# - resource_type
# - resource
if arn_parsed.partition is None or arn_parsed.partition == "":
raise RoleArnParsingPartitionEmpty
elif arn_parsed.service != "iam" and arn_parsed.service != "sts":
raise RoleArnParsingServiceNotIAMnorSTS
elif (
arn_parsed.account_id is None
or len(arn_parsed.account_id) != 12
or not arn_parsed.account_id.isnumeric()
):
raise RoleArnParsingInvalidAccountID
elif (
arn_parsed.resource_type != "role"
and arn_parsed.resource_type != "user"
and arn_parsed.resource_type != "assumed-role"
):
raise RoleArnParsingInvalidResourceType
elif arn_parsed.resource == "":
raise RoleArnParsingEmptyResource
else: else:
# check if needed fields are filled: return arn_parsed
# - partition
# - service
# - account_id
# - resource_type
# - resource
if arn_parsed.partition is None:
raise RoleArnParsingPartitionEmpty
elif arn_parsed.service != "iam":
raise RoleArnParsingServiceNotIAM
elif (
arn_parsed.account_id is None
or len(arn_parsed.account_id) != 12
or not arn_parsed.account_id.isnumeric()
):
raise RoleArnParsingInvalidAccountID
elif arn_parsed.resource_type != "role":
raise RoleArnParsingInvalidResourceType
elif arn_parsed.resource == "":
raise RoleArnParsingEmptyResource
else:
return arn_parsed
def is_valid_arn(arn: str) -> bool: def is_valid_arn(arn: str) -> bool:

View File

@@ -1,43 +1,49 @@
class RoleArnParsingFailedMissingFields(Exception): class RoleArnParsingFailedMissingFields(Exception):
# The arn contains a numberof fields different than six separated by :" # The ARN contains a numberof fields different than six separated by :"
def __init__(self): def __init__(self):
self.message = "The assumed role arn contains a number of fields different than six separated by :, please input a valid arn" self.message = "The assumed role ARN contains an invalid number of fields separated by : or it does not start by arn, please input a valid ARN"
super().__init__(self.message) super().__init__(self.message)
class RoleArnParsingIAMRegionNotEmpty(Exception): class RoleArnParsingIAMRegionNotEmpty(Exception):
# The arn contains a non-empty value for region, since it is an IAM arn is not valid # The ARN contains a non-empty value for region, since it is an IAM ARN is not valid
def __init__(self): def __init__(self):
self.message = "The assumed role arn contains a non-empty value for region, since it is an IAM arn is not valid, please input a valid arn" self.message = "The assumed role ARN contains a non-empty value for region, since it is an IAM ARN is not valid, please input a valid ARN"
super().__init__(self.message) super().__init__(self.message)
class RoleArnParsingPartitionEmpty(Exception): class RoleArnParsingPartitionEmpty(Exception):
# The arn contains an empty value for partition # The ARN contains an empty value for partition
def __init__(self): def __init__(self):
self.message = "The assumed role arn does not contain a value for partition, please input a valid arn" self.message = "The assumed role ARN does not contain a value for partition, please input a valid ARN"
super().__init__(self.message) super().__init__(self.message)
class RoleArnParsingServiceNotIAM(Exception): class RoleArnParsingServiceNotIAMnorSTS(Exception):
def __init__(self): def __init__(self):
self.message = "The assumed role arn contains a value for service distinct than iam, please input a valid arn" self.message = "The assumed role ARN contains a value for service distinct than IAM or STS, please input a valid ARN"
super().__init__(self.message)
class RoleArnParsingServiceNotSTS(Exception):
def __init__(self):
self.message = "The assumed role ARN contains a value for service distinct than STS, please input a valid ARN"
super().__init__(self.message) super().__init__(self.message)
class RoleArnParsingInvalidAccountID(Exception): class RoleArnParsingInvalidAccountID(Exception):
def __init__(self): def __init__(self):
self.message = "The assumed role arn contains a value for account id empty or invalid, a valid account id must be composed of 12 numbers, please input a valid arn" self.message = "The assumed role ARN contains a value for account id empty or invalid, a valid account id must be composed of 12 numbers, please input a valid ARN"
super().__init__(self.message) super().__init__(self.message)
class RoleArnParsingInvalidResourceType(Exception): class RoleArnParsingInvalidResourceType(Exception):
def __init__(self): def __init__(self):
self.message = "The assumed role arn contains a value for resource type different than role, please input a valid arn" self.message = "The assumed role ARN contains a value for resource type different than role, please input a valid ARN"
super().__init__(self.message) super().__init__(self.message)
class RoleArnParsingEmptyResource(Exception): class RoleArnParsingEmptyResource(Exception):
def __init__(self): def __init__(self):
self.message = "The assumed role arn does not contain a value for resource, please input a valid arn" self.message = "The assumed role ARN does not contain a value for resource, please input a valid ARN"
super().__init__(self.message) super().__init__(self.message)

View File

@@ -0,0 +1,57 @@
from typing import Optional
from pydantic import BaseModel
from prowler.providers.aws.lib.arn.error import RoleArnParsingFailedMissingFields
class ARN(BaseModel):
partition: str
service: str
region: Optional[str] # In IAM ARN's do not have region
account_id: str
resource: str
resource_type: str
def __init__(self, arn):
# Validate the ARN
## Check that arn starts with arn
if not arn.startswith("arn:"):
raise RoleArnParsingFailedMissingFields
## Retrieve fields
arn_elements = arn.split(":", 5)
data = {
"partition": arn_elements[1],
"service": arn_elements[2],
"region": arn_elements[3] if arn_elements[3] != "" else None,
"account_id": arn_elements[4],
"resource": arn_elements[5],
"resource_type": get_arn_resource_type(arn, arn_elements[2]),
}
if "/" in data["resource"]:
data["resource"] = data["resource"].split("/", 1)[1]
elif ":" in data["resource"]:
data["resource"] = data["resource"].split(":", 1)[1]
# Calls Pydantic's BaseModel __init__
super().__init__(**data)
def get_arn_resource_type(arn, service):
if service == "s3":
resource_type = "bucket"
elif service == "sns":
resource_type = "topic"
elif service == "sqs":
resource_type = "queue"
elif service == "apigateway":
split_parts = arn.split(":")[5].split("/")
if "integration" in split_parts and "responses" in split_parts:
resource_type = "restapis-resources-methods-integration-response"
elif "documentation" in split_parts and "parts" in split_parts:
resource_type = "restapis-documentation-parts"
else:
resource_type = arn.split(":")[5].split("/")[1]
else:
resource_type = arn.split(":")[5].split("/")[0]
return resource_type

View File

@@ -0,0 +1,59 @@
import sys
from boto3 import session
from colorama import Fore, Style
from prowler.lib.logger import logger
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_STS_GLOBAL_ENDPOINT_REGION = "us-east-1"
def validate_aws_credentials(session: session, input_regions: list) -> dict:
try:
# For a valid STS GetCallerIdentity we have to use the right AWS Region
if input_regions is None or len(input_regions) == 0:
if session.region_name is not None:
aws_region = session.region_name
else:
# If there is no region set passed with -f/--region
# we use the Global STS Endpoint Region, us-east-1
aws_region = AWS_STS_GLOBAL_ENDPOINT_REGION
else:
# Get the first region passed to the -f/--region
aws_region = input_regions[0]
validate_credentials_client = session.client("sts", aws_region)
caller_identity = validate_credentials_client.get_caller_identity()
# Include the region where the caller_identity has validated the credentials
caller_identity["region"] = aws_region
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
else:
return caller_identity
def print_aws_credentials(audit_info: AWS_Audit_Info):
# Beautify audited regions, set "all" if there is no filter region
regions = (
", ".join(audit_info.audited_regions)
if audit_info.audited_regions is not None
else "all"
)
# Beautify audited profile, set "default" if there is no profile set
profile = audit_info.profile if audit_info.profile is not None else "default"
report = f"""
This report is being generated using credentials below:
AWS-CLI Profile: {Fore.YELLOW}[{profile}]{Style.RESET_ALL} AWS Filter Region: {Fore.YELLOW}[{regions}]{Style.RESET_ALL}
AWS Account: {Fore.YELLOW}[{audit_info.audited_account}]{Style.RESET_ALL} UserId: {Fore.YELLOW}[{audit_info.audited_user_id}]{Style.RESET_ALL}
Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESET_ALL}
"""
# If -A is set, print Assumed Role ARN
if audit_info.assumed_role_info.role_arn is not None:
report += f"""Assumed Role ARN: {Fore.YELLOW}[{audit_info.assumed_role_info.role_arn}]{Style.RESET_ALL}
"""
print(report)

View File

@@ -0,0 +1,40 @@
import sys
from boto3 import client
from prowler.lib.logger import logger
from prowler.providers.aws.lib.audit_info.models import AWS_Organizations_Info
def get_organizations_metadata(
metadata_account: str, assumed_credentials: dict
) -> AWS_Organizations_Info:
try:
organizations_client = client(
"organizations",
aws_access_key_id=assumed_credentials["Credentials"]["AccessKeyId"],
aws_secret_access_key=assumed_credentials["Credentials"]["SecretAccessKey"],
aws_session_token=assumed_credentials["Credentials"]["SessionToken"],
)
organizations_metadata = organizations_client.describe_account(
AccountId=metadata_account
)
list_tags_for_resource = organizations_client.list_tags_for_resource(
ResourceId=metadata_account
)
except Exception as error:
logger.critical(f"{error.__class__.__name__} -- {error}")
sys.exit(1)
else:
# Convert Tags dictionary to String
account_details_tags = ""
for tag in list_tags_for_resource["Tags"]:
account_details_tags += tag["Key"] + ":" + tag["Value"] + ","
organizations_info = AWS_Organizations_Info(
account_details_email=organizations_metadata["Account"]["Email"],
account_details_name=organizations_metadata["Account"]["Name"],
account_details_arn=organizations_metadata["Account"]["Arn"],
account_details_org=organizations_metadata["Account"]["Arn"].split("/")[1],
account_details_tags=account_details_tags,
)
return organizations_info

View File

@@ -14,6 +14,7 @@ from prowler.config.config import (
output_file_timestamp, output_file_timestamp,
) )
from prowler.lib.logger import logger from prowler.lib.logger import logger
from prowler.providers.aws.lib.arn.models import get_arn_resource_type
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
@@ -153,22 +154,8 @@ def create_inventory_table(resources: list, resources_in_region: dict) -> dict:
services[service] = 0 services[service] = 0
services[service] += 1 services[service] += 1
if service == "s3": resource_type = get_arn_resource_type(resource["arn"], service)
resource_type = "bucket"
elif service == "sns":
resource_type = "topic"
elif service == "sqs":
resource_type = "queue"
elif service == "apigateway":
split_parts = resource["arn"].split(":")[5].split("/")
if "integration" in split_parts and "responses" in split_parts:
resource_type = "restapis-resources-methods-integration-response"
elif "documentation" in split_parts and "parts" in split_parts:
resource_type = "restapis-documentation-parts"
else:
resource_type = resource["arn"].split(":")[5].split("/")[1]
else:
resource_type = resource["arn"].split(":")[5].split("/")[0]
if service not in resources_type: if service not in resources_type:
resources_type[service] = {} resources_type[service] = {}
if resource_type not in resources_type[service]: if resource_type not in resources_type[service]:

View File

@@ -1,7 +1,5 @@
import sys import sys
from arnparse import arnparse
from boto3 import client, session
from botocore.config import Config from botocore.config import Config
from colorama import Fore, Style from colorama import Fore, Style
@@ -12,12 +10,15 @@ from prowler.providers.aws.aws_provider import (
get_checks_from_input_arn, get_checks_from_input_arn,
get_regions_from_audit_resources, get_regions_from_audit_resources,
) )
from prowler.providers.aws.lib.arn.arn import arn_parsing from prowler.providers.aws.lib.arn.arn import parse_iam_credentials_arn
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.lib.audit_info.models import ( from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info, AWS_Credentials
AWS_Audit_Info, from prowler.providers.aws.lib.credentials.credentials import (
AWS_Credentials, print_aws_credentials,
AWS_Organizations_Info, validate_aws_credentials,
)
from prowler.providers.aws.lib.organizations.organizations import (
get_organizations_metadata,
) )
from prowler.providers.aws.lib.resource_api_tagging.resource_api_tagging import ( from prowler.providers.aws.lib.resource_api_tagging.resource_api_tagging import (
get_tagged_resources, get_tagged_resources,
@@ -34,39 +35,6 @@ class Audit_Info:
def __init__(self): def __init__(self):
logger.info("Setting Audit Info ...") logger.info("Setting Audit Info ...")
def validate_credentials(self, validate_session: session) -> dict:
try:
validate_credentials_client = validate_session.client("sts")
caller_identity = validate_credentials_client.get_caller_identity()
except Exception as error:
logger.critical(f"{error.__class__.__name__} -- {error}")
sys.exit(1)
else:
return caller_identity
def print_aws_credentials(self, audit_info: AWS_Audit_Info):
# Beautify audited regions, set "all" if there is no filter region
regions = (
", ".join(audit_info.audited_regions)
if audit_info.audited_regions is not None
else "all"
)
# Beautify audited profile, set "default" if there is no profile set
profile = audit_info.profile if audit_info.profile is not None else "default"
report = f"""
This report is being generated using credentials below:
AWS-CLI Profile: {Fore.YELLOW}[{profile}]{Style.RESET_ALL} AWS Filter Region: {Fore.YELLOW}[{regions}]{Style.RESET_ALL}
AWS Account: {Fore.YELLOW}[{audit_info.audited_account}]{Style.RESET_ALL} UserId: {Fore.YELLOW}[{audit_info.audited_user_id}]{Style.RESET_ALL}
Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESET_ALL}
"""
# If -A is set, print Assumed Role ARN
if audit_info.assumed_role_info.role_arn is not None:
report += f"""Assumed Role ARN: {Fore.YELLOW}[{audit_info.assumed_role_info.role_arn}]{Style.RESET_ALL}
"""
print(report)
def print_gcp_credentials(self, audit_info: GCP_Audit_Info): def print_gcp_credentials(self, audit_info: GCP_Audit_Info):
# Beautify audited profile, set "default" if there is no profile set # Beautify audited profile, set "default" if there is no profile set
try: try:
@@ -100,43 +68,6 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE
""" """
print(report) print(report)
def get_organizations_metadata(
self, metadata_account: str, assumed_credentials: dict
) -> AWS_Organizations_Info:
try:
organizations_client = client(
"organizations",
aws_access_key_id=assumed_credentials["Credentials"]["AccessKeyId"],
aws_secret_access_key=assumed_credentials["Credentials"][
"SecretAccessKey"
],
aws_session_token=assumed_credentials["Credentials"]["SessionToken"],
)
organizations_metadata = organizations_client.describe_account(
AccountId=metadata_account
)
list_tags_for_resource = organizations_client.list_tags_for_resource(
ResourceId=metadata_account
)
except Exception as error:
logger.critical(f"{error.__class__.__name__} -- {error}")
sys.exit(1)
else:
# Convert Tags dictionary to String
account_details_tags = ""
for tag in list_tags_for_resource["Tags"]:
account_details_tags += tag["Key"] + ":" + tag["Value"] + ","
organizations_info = AWS_Organizations_Info(
account_details_email=organizations_metadata["Account"]["Email"],
account_details_name=organizations_metadata["Account"]["Name"],
account_details_arn=organizations_metadata["Account"]["Arn"],
account_details_org=organizations_metadata["Account"]["Arn"].split("/")[
1
],
account_details_tags=account_details_tags,
)
return organizations_info
def set_aws_audit_info(self, arguments) -> AWS_Audit_Info: def set_aws_audit_info(self, arguments) -> AWS_Audit_Info:
""" """
set_aws_audit_info returns the AWS_Audit_Info set_aws_audit_info returns the AWS_Audit_Info
@@ -188,7 +119,9 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE
current_audit_info.original_session = aws_provider.aws_session current_audit_info.original_session = aws_provider.aws_session
logger.info("Validating credentials ...") logger.info("Validating credentials ...")
# Verificate if we have valid credentials # Verificate if we have valid credentials
caller_identity = self.validate_credentials(current_audit_info.original_session) caller_identity = validate_aws_credentials(
current_audit_info.original_session, input_regions
)
logger.info("Credentials validated") logger.info("Credentials validated")
logger.info(f"Original caller identity UserId: {caller_identity['UserId']}") logger.info(f"Original caller identity UserId: {caller_identity['UserId']}")
@@ -197,7 +130,7 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE
current_audit_info.audited_account = caller_identity["Account"] current_audit_info.audited_account = caller_identity["Account"]
current_audit_info.audited_identity_arn = caller_identity["Arn"] current_audit_info.audited_identity_arn = caller_identity["Arn"]
current_audit_info.audited_user_id = caller_identity["UserId"] current_audit_info.audited_user_id = caller_identity["UserId"]
current_audit_info.audited_partition = arnparse( current_audit_info.audited_partition = parse_iam_credentials_arn(
caller_identity["Arn"] caller_identity["Arn"]
).partition ).partition
@@ -210,8 +143,8 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE
# Check if role arn is valid # Check if role arn is valid
try: try:
# this returns the arn already parsed, calls arnparse, into a dict to be used when it is needed to access its fields # this returns the arn already parsed into a dict to be used when it is needed to access its fields
role_arn_parsed = arn_parsing( role_arn_parsed = parse_iam_credentials_arn(
current_audit_info.assumed_role_info.role_arn current_audit_info.assumed_role_info.role_arn
) )
@@ -226,10 +159,8 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE
assumed_credentials = assume_role( assumed_credentials = assume_role(
aws_provider.aws_session, aws_provider.role_info aws_provider.aws_session, aws_provider.role_info
) )
current_audit_info.organizations_metadata = ( current_audit_info.organizations_metadata = get_organizations_metadata(
self.get_organizations_metadata( current_audit_info.audited_account, assumed_credentials
current_audit_info.audited_account, assumed_credentials
)
) )
logger.info("Organizations metadata retrieved") logger.info("Organizations metadata retrieved")
@@ -243,8 +174,8 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE
# Check if role arn is valid # Check if role arn is valid
try: try:
# this returns the arn already parsed, calls arnparse, into a dict to be used when it is needed to access its fields # this returns the arn already parsed into a dict to be used when it is needed to access its fields
role_arn_parsed = arn_parsing( role_arn_parsed = parse_iam_credentials_arn(
current_audit_info.assumed_role_info.role_arn current_audit_info.assumed_role_info.role_arn
) )
@@ -293,7 +224,7 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE
current_audit_info.profile_region = "us-east-1" current_audit_info.profile_region = "us-east-1"
if not arguments.get("only_logs"): if not arguments.get("only_logs"):
self.print_aws_credentials(current_audit_info) print_aws_credentials(current_audit_info)
# Parse Scan Tags # Parse Scan Tags
if arguments.get("resource_tags"): if arguments.get("resource_tags"):

View File

@@ -26,7 +26,6 @@ version = "3.5.1"
[tool.poetry.dependencies] [tool.poetry.dependencies]
alive-progress = "3.1.1" alive-progress = "3.1.1"
arnparse = "0.0.2"
azure-identity = "1.12.0" azure-identity = "1.12.0"
azure-mgmt-authorization = "3.0.0" azure-mgmt-authorization = "3.0.0"
azure-mgmt-security = "5.0.0" azure-mgmt-security = "5.0.0"

View File

@@ -1,30 +1,255 @@
import sure # noqa import sure # noqa
from pytest import raises
from prowler.providers.aws.lib.arn.arn import arn_parsing, is_valid_arn from prowler.providers.aws.lib.arn.arn import is_valid_arn, parse_iam_credentials_arn
from prowler.providers.aws.lib.arn.error import (
RoleArnParsingEmptyResource,
RoleArnParsingFailedMissingFields,
RoleArnParsingIAMRegionNotEmpty,
RoleArnParsingInvalidAccountID,
RoleArnParsingInvalidResourceType,
RoleArnParsingPartitionEmpty,
RoleArnParsingServiceNotIAMnorSTS,
)
from prowler.providers.aws.lib.arn.models import ARN
ACCOUNT_ID = "123456789012" ACCOUNT_ID = "123456789012"
RESOURCE_TYPE = "role" RESOURCE_TYPE_ROLE = "role"
RESOUCE_TYPE_USER = "user"
IAM_ROLE = "test-role" IAM_ROLE = "test-role"
IAM_SERVICE = "iam"
COMMERCIAL_PARTITION = "aws"
CHINA_PARTITION = "aws-cn"
GOVCLOUD_PARTITION = "aws-us-gov"
class Test_ARN_Parsing: class Test_ARN_Parsing:
def test_arn_parsing(self): def test_ARN_model(self):
# https://gist.github.com/cmawhorter/80bf94f12bf7516d50a7d61ed28859d3
test_cases = [
"arn:aws:elasticbeanstalk:us-east-1:123456789012:environment/My App/MyEnvironment",
"arn:aws:iam::123456789012:user/David",
"arn:aws:rds:eu-west-1:123456789012:db:mysql-db",
"arn:aws:s3:::my_corporate_bucket/exampleobject.png",
"arn:aws:artifact:::report-package/Certifications and Attestations/SOC/*",
"arn:aws:artifact:::report-package/Certifications and Attestations/ISO/*",
"arn:aws:artifact:::report-package/Certifications and Attestations/PCI/*",
# "arn:aws:autoscaling:us-east-1:123456789012:scalingPolicy:c7a27f55-d35e-4153-b044-8ca9155fc467:autoScalingGroupName/my-test-asg1:policyName/my-scaleout-policy",
"arn:aws:acm:us-east-1:123456789012:certificate/12345678-1234-1234-1234-123456789012",
"arn:aws:cloudformation:us-east-1:123456789012:stack/MyProductionStack/abc9dbf0-43c2-11e3-a6e8-50fa526be49c",
"arn:aws:cloudformation:us-east-1:123456789012:changeSet/MyProductionChangeSet/abc9dbf0-43c2-11e3-a6e8-50fa526be49c",
"arn:aws:cloudsearch:us-east-1:123456789012:domain/imdb-movies",
"arn:aws:cloudtrail:us-east-1:123456789012:trail/mytrailname",
"arn:aws:events:us-east-1:*:*",
"arn:aws:events:us-east-1:account-id:*",
"arn:aws:events:us-east-1:account-id:rule/rule_name",
"arn:aws:logs:us-east-1:*:*",
"arn:aws:logs:us-east-1:account-id:*",
"arn:aws:logs:us-east-1:account-id:log-group:log_group_name",
"arn:aws:logs:us-east-1:account-id:log-group:log_group_name:*",
"arn:aws:logs:us-east-1:account-id:log-group:log_group_name_prefix*",
"arn:aws:logs:us-east-1:account-id:log-group:log_group_name:log-stream:log_stream_name",
"arn:aws:logs:us-east-1:account-id:log-group:log_group_name:log-stream:log_stream_name_prefix*",
"arn:aws:logs:us-east-1:account-id:log-group:log_group_name_prefix*:log-stream:log_stream_name_prefix*",
"arn:aws:codebuild:us-east-1:123456789012:project/my-demo-project",
"arn:aws:codebuild:us-east-1:123456789012:build/my-demo-project:7b7416ae-89b4-46cc-8236-61129df660ad",
"arn:aws:codecommit:us-east-1:123456789012:MyDemoRepo",
"arn:aws:codedeploy:us-east-1:123456789012:application:WordPress_App",
"arn:aws:codedeploy:us-east-1:123456789012:instance/AssetTag*",
"arn:aws:config:us-east-1:123456789012:config-rule/MyConfigRule",
"arn:aws:codepipeline:us-east-1:123456789012:MyDemoPipeline",
"arn:aws:directconnect:us-east-1:123456789012:dxcon/dxcon-fgase048",
"arn:aws:directconnect:us-east-1:123456789012:dxvif/dxvif-fgrb110x",
"arn:aws:dynamodb:us-east-1:123456789012:table/books_table",
"arn:aws:ecr:us-east-1:123456789012:repository/my-repository",
"arn:aws:ecs:us-east-1:123456789012:cluster/my-cluster",
"arn:aws:ecs:us-east-1:123456789012:container-instance/403125b0-555c-4473-86b5-65982db28a6d",
"arn:aws:ecs:us-east-1:123456789012:task-definition/hello_world:8",
"arn:aws:ecs:us-east-1:123456789012:service/sample-webapp",
"arn:aws:ecs:us-east-1:123456789012:task/1abf0f6d-a411-4033-b8eb-a4eed3ad252a",
"arn:aws:ecs:us-east-1:123456789012:container/476e7c41-17f2-4c17-9d14-412566202c8a",
"arn:aws:ec2:us-east-1:123456789012:dedicated-host/h-12345678",
"arn:aws:ec2:us-east-1::image/ami-1a2b3c4d",
"arn:aws:ec2:us-east-1:123456789012:instance/*",
"arn:aws:ec2:us-east-1:123456789012:volume/*",
"arn:aws:ec2:us-east-1:123456789012:volume/vol-1a2b3c4d",
"arn:aws:elasticbeanstalk:us-east-1:123456789012:application/My App",
"arn:aws:elasticbeanstalk:us-east-1:123456789012:applicationversion/My App/My Version",
"arn:aws:elasticbeanstalk:us-east-1:123456789012:environment/My App/MyEnvironment",
"arn:aws:elasticbeanstalk:us-east-1::solutionstack/32bit Amazon Linux running Tomcat 7",
"arn:aws:elasticbeanstalk:us-east-1:123456789012:configurationtemplate/My App/My Template",
"arn:aws:elasticfilesystem:us-east-1:123456789012:file-system-id/fs12345678",
"arn:aws:elasticloadbalancing:us-east-1:123456789012:loadbalancer/app/my-load-balancer/50dc6c495c0c9188",
"arn:aws:elasticloadbalancing:us-east-1:123456789012:listener/app/my-load-balancer/50dc6c495c0c9188/f2f7dc8efc522ab2",
"arn:aws:elasticloadbalancing:us-east-1:123456789012:listener-rule/app/my-load-balancer/50dc6c495c0c9188/f2f7dc8efc522ab2/9683b2d02a6cabee",
"arn:aws:elasticloadbalancing:us-east-1:123456789012:targetgroup/my-targets/73e2d6bc24d8a067",
"arn:aws:elasticloadbalancing:us-east-1:123456789012:loadbalancer/my-load-balancer",
"arn:aws:elastictranscoder:us-east-1:123456789012:preset/*",
"arn:aws:elasticache:us-west-2:123456789012:cluster:myCluster",
"arn:aws:elasticache:us-west-2:123456789012:snapshot:mySnapshot",
"arn:aws:es:us-east-1:123456789012:domain/streaming-logs",
"arn:aws:glacier:us-east-1:123456789012:vaults/examplevault",
"arn:aws:glacier:us-east-1:123456789012:vaults/example*",
"arn:aws:glacier:us-east-1:123456789012:vaults/*",
"arn:aws:health:us-east-1::event/AWS_EC2_EXAMPLE_ID",
"arn:aws:health:us-east-1:123456789012:entity/AVh5GGT7ul1arKr1sE1K",
"arn:aws:iam::123456789012:root",
"arn:aws:iam::123456789012:user/Bob",
"arn:aws:iam::123456789012:user/division_abc/subdivision_xyz/Bob",
"arn:aws:iam::123456789012:group/Developers",
"arn:aws:iam::123456789012:group/division_abc/subdivision_xyz/product_A/Developers",
"arn:aws:iam::123456789012:role/S3Access",
"arn:aws:iam::123456789012:role/application_abc/component_xyz/S3Access",
"arn:aws:iam::123456789012:policy/UsersManageOwnCredentials",
"arn:aws:iam::123456789012:policy/division_abc/subdivision_xyz/UsersManageOwnCredentials",
"arn:aws:iam::123456789012:instance-profile/Webserver",
"arn:aws:sts::123456789012:federated-user/Bob",
"arn:aws:sts::123456789012:assumed-role/Accounting-Role/Mary",
"arn:aws:iam::123456789012:mfa/BobJonesMFA",
"arn:aws:iam::123456789012:server-certificate/ProdServerCert",
"arn:aws:iam::123456789012:server-certificate/division_abc/subdivision_xyz/ProdServerCert",
"arn:aws:iam::123456789012:saml-provider/ADFSProvider",
"arn:aws:iam::123456789012:oidc-provider/GoogleProvider",
"arn:aws:iot:your-region:123456789012:cert/123a456b789c123d456e789f123a456b789c123d456e789f123a456b789c123c456d7",
"arn:aws:iot::123456789012:policy/MyIoTPolicy",
"arn:aws:iot:your-region:123456789012:rule/MyIoTRule",
"arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012",
"arn:aws:kms:us-east-1:123456789012:alias/example-alias",
"arn:aws:firehose:us-east-1:123456789012:deliverystream/example-stream-name",
"arn:aws:kinesis:us-east-1:123456789012:stream/example-stream-name",
"arn:aws:lambda:us-east-1:123456789012:function:ProcessKinesisRecords",
"arn:aws:lambda:us-east-1:123456789012:function:ProcessKinesisRecords:your alias",
"arn:aws:lambda:us-east-1:123456789012:function:ProcessKinesisRecords:1.0",
"arn:aws:lambda:us-east-1:123456789012:event-source-mappings:kinesis-stream-arn",
"arn:aws:machinelearning:us-east-1:123456789012:datasource/my-datasource-1",
"arn:aws:machinelearning:us-east-1:123456789012:mlmodel/my-mlmodel",
"arn:aws:machinelearning:us-east-1:123456789012:batchprediction/my-batchprediction",
"arn:aws:machinelearning:us-east-1:123456789012:evaluation/my-evaluation",
"arn:aws:polly:us-east-1:123456789012:lexicon/myLexicon",
"arn:aws:redshift:us-east-1:123456789012:cluster:my-cluster",
"arn:aws:redshift:us-east-1:123456789012:my-cluster/my-dbuser-name",
"arn:aws:redshift:us-east-1:123456789012:parametergroup:my-parameter-group",
"arn:aws:redshift:us-east-1:123456789012:securitygroup:my-public-group",
"arn:aws:redshift:us-east-1:123456789012:snapshot:my-cluster/my-snapshot20130807",
"arn:aws:redshift:us-east-1:123456789012:subnetgroup:my-subnet-10 ",
"arn:aws:rds:us-east-1:123456789012:db:mysql-db-instance1",
"arn:aws:rds:us-east-1:123456789012:snapshot:my-snapshot2",
"arn:aws:rds:us-east-1:123456789012:cluster:my-cluster1",
"arn:aws:rds:us-east-1:123456789012:cluster-snapshot:cluster1-snapshot7",
"arn:aws:rds:us-east-1:123456789012:og:mysql-option-group1",
"arn:aws:rds:us-east-1:123456789012:pg:mysql-repl-pg1",
"arn:aws:rds:us-east-1:123456789012:cluster-pg:aurora-pg3",
"arn:aws:rds:us-east-1:123456789012:secgrp:dev-secgrp2",
"arn:aws:rds:us-east-1:123456789012:subgrp:prod-subgrp1",
"arn:aws:rds:us-east-1:123456789012:es:monitor-events2",
"arn:aws:route53:::hostedzone/Z148QEXAMPLE8V",
"arn:aws:route53:::change/C2RDJ5EXAMPLE2",
"arn:aws:route53:::change/*",
"arn:aws:ssm:us-east-1:123456789012:document/highAvailabilityServerSetup",
"arn:aws:sns:*:123456789012:my_corporate_topic",
"arn:aws:sns:us-east-1:123456789012:my_corporate_topic:02034b43-fefa-4e07-a5eb-3be56f8c54ce",
"arn:aws:sqs:us-east-1:123456789012:queue1",
"arn:aws:s3:::my_corporate_bucket",
"arn:aws:s3:::my_corporate_bucket/exampleobject.png",
"arn:aws:s3:::my_corporate_bucket/*",
"arn:aws:s3:::my_corporate_bucket/Development/*",
"arn:aws:swf:us-east-1:123456789012:/domain/department1",
"arn:aws:swf:*:123456789012:/domain/*",
"arn:aws:states:us-east-1:123456789012:activity:HelloActivity",
"arn:aws:states:us-east-1:123456789012:stateMachine:HelloStateMachine",
"arn:aws:states:us-east-1:123456789012:execution:HelloStateMachine:HelloStateMachineExecution",
"arn:aws:storagegateway:us-east-1:123456789012:gateway/sgw-12A3456B",
"arn:aws:storagegateway:us-east-1:123456789012:gateway/sgw-12A3456B/volume/vol-1122AABB",
"arn:aws:storagegateway:us-east-1:123456789012:tape/AMZNC8A26D",
"arn:aws:storagegateway:us-east-1:123456789012:gateway/sgw-12A3456B/target/iqn.1997-05.com.amazon:vol-1122AABB",
"arn:aws:storagegateway:us-east-1:123456789012:gateway/sgw-12A3456B/device/AMZN_SGW-FF22CCDD_TAPEDRIVE_00010",
"arn:aws:trustedadvisor:*:123456789012:checks/fault_tolerance/BueAdJ7NrP",
"arn:aws:waf::123456789012:rule/41b5b052-1e4a-426b-8149-3595be6342c2",
"arn:aws:waf::123456789012:webacl/3bffd3ed-fa2e-445e-869f-a6a7cf153fd3",
"arn:aws:waf::123456789012:ipset/3f74bd8c-f046-4970-a1a7-41aa52e05480",
"arn:aws:waf::123456789012:bytematchset/d131bc0b-57be-4536-af1d-4894fd28acc4",
"arn:aws:waf::123456789012:sqlinjectionset/2be79d6f-2f41-4c9b-8192-d719676873f0",
"arn:aws:waf::123456789012:changetoken/03ba2197-fc98-4ac0-a67d-5b839762b16b",
"arn:aws:iam::123456789012:user/Development/product_1234/*",
"arn:aws:s3:::my_corporate_bucket/*",
"arn:aws:s3:::my_corporate_bucket/Development/*",
]
# For now we are only testing that the ARN library does not raise any exception with the above list of ARNs.
for arn in test_cases:
_ = ARN(arn)
def test_iam_credentials_arn_parsing(self):
test_cases = [ test_cases = [
{ {
"input_arn": f"arn:aws:iam::{ACCOUNT_ID}:{RESOURCE_TYPE}/{IAM_ROLE}", "input_arn": f"arn:aws:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOURCE_TYPE_ROLE}/{IAM_ROLE}",
"expected": { "expected": {
"partition": "aws", "partition": COMMERCIAL_PARTITION,
"service": "iam", "service": IAM_SERVICE,
"region": None, "region": None,
"account_id": ACCOUNT_ID, "account_id": ACCOUNT_ID,
"resource_type": RESOURCE_TYPE, "resource_type": RESOURCE_TYPE_ROLE,
"resource": IAM_ROLE, "resource": IAM_ROLE,
}, },
} },
{
"input_arn": f"arn:aws:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOUCE_TYPE_USER}/{IAM_ROLE}",
"expected": {
"partition": COMMERCIAL_PARTITION,
"service": IAM_SERVICE,
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": RESOUCE_TYPE_USER,
"resource": IAM_ROLE,
},
},
{
"input_arn": f"arn:{CHINA_PARTITION}:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOURCE_TYPE_ROLE}/{IAM_ROLE}",
"expected": {
"partition": CHINA_PARTITION,
"service": IAM_SERVICE,
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": RESOURCE_TYPE_ROLE,
"resource": IAM_ROLE,
},
},
{
"input_arn": f"arn:{CHINA_PARTITION}:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOUCE_TYPE_USER}/{IAM_ROLE}",
"expected": {
"partition": CHINA_PARTITION,
"service": IAM_SERVICE,
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": RESOUCE_TYPE_USER,
"resource": IAM_ROLE,
},
},
{
"input_arn": f"arn:{GOVCLOUD_PARTITION}:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOURCE_TYPE_ROLE}/{IAM_ROLE}",
"expected": {
"partition": GOVCLOUD_PARTITION,
"service": IAM_SERVICE,
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": RESOURCE_TYPE_ROLE,
"resource": IAM_ROLE,
},
},
{
"input_arn": f"arn:{GOVCLOUD_PARTITION}:{IAM_SERVICE}::{ACCOUNT_ID}:{RESOUCE_TYPE_USER}/{IAM_ROLE}",
"expected": {
"partition": GOVCLOUD_PARTITION,
"service": IAM_SERVICE,
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": RESOUCE_TYPE_USER,
"resource": IAM_ROLE,
},
},
] ]
for test in test_cases: for test in test_cases:
input_arn = test["input_arn"] input_arn = test["input_arn"]
parsed_arn = arn_parsing(input_arn) parsed_arn = parse_iam_credentials_arn(input_arn)
parsed_arn.partition.should.equal(test["expected"]["partition"]) parsed_arn.partition.should.equal(test["expected"]["partition"])
parsed_arn.service.should.equal(test["expected"]["service"]) parsed_arn.service.should.equal(test["expected"]["service"])
parsed_arn.region.should.equal(test["expected"]["region"]) parsed_arn.region.should.equal(test["expected"]["region"])
@@ -32,6 +257,59 @@ class Test_ARN_Parsing:
parsed_arn.resource_type.should.equal(test["expected"]["resource_type"]) parsed_arn.resource_type.should.equal(test["expected"]["resource_type"])
parsed_arn.resource.should.equal(test["expected"]["resource"]) parsed_arn.resource.should.equal(test["expected"]["resource"])
def test_iam_credentials_arn_parsing_raising_RoleArnParsingFailedMissingFields(
self,
):
input_arn = ""
with raises(RoleArnParsingFailedMissingFields) as error:
parse_iam_credentials_arn(input_arn)
assert error._excinfo[0] == RoleArnParsingFailedMissingFields
def test_iam_credentials_arn_parsing_raising_RoleArnParsingIAMRegionNotEmpty(self):
input_arn = "arn:aws:iam:eu-west-1:111111111111:user/prowler"
with raises(RoleArnParsingIAMRegionNotEmpty) as error:
parse_iam_credentials_arn(input_arn)
assert error._excinfo[0] == RoleArnParsingIAMRegionNotEmpty
def test_iam_credentials_arn_parsing_raising_RoleArnParsingPartitionEmpty(self):
input_arn = "arn::iam::111111111111:user/prowler"
with raises(RoleArnParsingPartitionEmpty) as error:
parse_iam_credentials_arn(input_arn)
assert error._excinfo[0] == RoleArnParsingPartitionEmpty
def test_iam_credentials_arn_parsing_raising_RoleArnParsingServiceNotIAM(self):
input_arn = "arn:aws:s3::111111111111:user/prowler"
with raises(RoleArnParsingServiceNotIAMnorSTS) as error:
parse_iam_credentials_arn(input_arn)
assert error._excinfo[0] == RoleArnParsingServiceNotIAMnorSTS
def test_iam_credentials_arn_parsing_raising_RoleArnParsingInvalidAccountID(self):
input_arn = "arn:aws:iam::AWS_ACCOUNT_ID:user/prowler"
with raises(RoleArnParsingInvalidAccountID) as error:
parse_iam_credentials_arn(input_arn)
assert error._excinfo[0] == RoleArnParsingInvalidAccountID
def test_iam_credentials_arn_parsing_raising_RoleArnParsingInvalidResourceType(
self,
):
input_arn = "arn:aws:iam::111111111111:account/prowler"
with raises(RoleArnParsingInvalidResourceType) as error:
parse_iam_credentials_arn(input_arn)
assert error._excinfo[0] == RoleArnParsingInvalidResourceType
def test_iam_credentials_arn_parsing_raising_RoleArnParsingEmptyResource(self):
input_arn = "arn:aws:iam::111111111111:role/"
with raises(RoleArnParsingEmptyResource) as error:
parse_iam_credentials_arn(input_arn)
assert error._excinfo[0] == RoleArnParsingEmptyResource
def test_is_valid_arn(self): def test_is_valid_arn(self):
assert is_valid_arn("arn:aws:iam::012345678910:user/test") assert is_valid_arn("arn:aws:iam::012345678910:user/test")
assert is_valid_arn("arn:aws-cn:ec2:us-east-1:123456789012:vpc/vpc-12345678") assert is_valid_arn("arn:aws-cn:ec2:us-east-1:123456789012:vpc/vpc-12345678")

View File

@@ -0,0 +1,312 @@
import re
import boto3
import botocore
from mock import patch
from moto import mock_iam, mock_sts
from prowler.providers.aws.lib.arn.arn import parse_iam_credentials_arn
from prowler.providers.aws.lib.credentials.credentials import validate_aws_credentials
AWS_ACCOUNT_NUMBER = "123456789012"
# Mocking GetCallerIdentity for China and GovCloud
make_api_call = botocore.client.BaseClient._make_api_call
def mock_get_caller_identity_china(self, operation_name, kwarg):
if operation_name == "GetCallerIdentity":
return {
"UserId": "XXXXXXXXXXXXXXXXXXXXX",
"Account": AWS_ACCOUNT_NUMBER,
"Arn": f"arn:aws-cn:iam::{AWS_ACCOUNT_NUMBER}:user/test-user",
}
return make_api_call(self, operation_name, kwarg)
def mock_get_caller_identity_gov_cloud(self, operation_name, kwarg):
if operation_name == "GetCallerIdentity":
return {
"UserId": "XXXXXXXXXXXXXXXXXXXXX",
"Account": AWS_ACCOUNT_NUMBER,
"Arn": f"arn:aws-us-gov:iam::{AWS_ACCOUNT_NUMBER}:user/test-user",
}
return make_api_call(self, operation_name, kwarg)
class Test_AWS_Credentials:
@mock_sts
@mock_iam
def test_validate_credentials_commercial_partition_with_regions(self):
# AWS Region for AWS COMMERCIAL
aws_region = "eu-west-1"
aws_partition = "aws"
# Create a mock IAM user
iam_client = boto3.client("iam", region_name=aws_region)
iam_user = iam_client.create_user(UserName="test-user")["User"]
# Create a mock IAM access keys
access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[
"AccessKey"
]
access_key_id = access_key["AccessKeyId"]
secret_access_key = access_key["SecretAccessKey"]
# Create AWS session to validate
session = boto3.session.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name=aws_region,
)
get_caller_identity = validate_aws_credentials(session, [aws_region])
assert get_caller_identity["region"] == aws_region
caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"])
assert caller_identity_arn.partition == aws_partition
assert caller_identity_arn.region is None
assert caller_identity_arn.resource == "test-user"
assert caller_identity_arn.resource_type == "user"
assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"])
assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER
@mock_sts
@mock_iam
def test_validate_credentials_commercial_partition_with_regions_none_and_profile_region_so_profile_region(
self,
):
# AWS Region for AWS COMMERCIAL
aws_region = "eu-west-1"
aws_partition = "aws"
# Create a mock IAM user
iam_client = boto3.client("iam", region_name=aws_region)
iam_user = iam_client.create_user(UserName="test-user")["User"]
# Create a mock IAM access keys
access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[
"AccessKey"
]
access_key_id = access_key["AccessKeyId"]
secret_access_key = access_key["SecretAccessKey"]
# Create AWS session to validate
session = boto3.session.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name=aws_region,
)
get_caller_identity = validate_aws_credentials(session, None)
assert get_caller_identity["region"] == aws_region
caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"])
assert caller_identity_arn.partition == aws_partition
assert caller_identity_arn.region is None
assert caller_identity_arn.resource == "test-user"
assert caller_identity_arn.resource_type == "user"
assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"])
assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER
@mock_sts
@mock_iam
def test_validate_credentials_commercial_partition_with_0_regions_and_profile_region_so_profile_region(
self,
):
# AWS Region for AWS COMMERCIAL
aws_region = "eu-west-1"
aws_partition = "aws"
# Create a mock IAM user
iam_client = boto3.client("iam", region_name=aws_region)
iam_user = iam_client.create_user(UserName="test-user")["User"]
# Create a mock IAM access keys
access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[
"AccessKey"
]
access_key_id = access_key["AccessKeyId"]
secret_access_key = access_key["SecretAccessKey"]
# Create AWS session to validate
session = boto3.session.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name=aws_region,
)
get_caller_identity = validate_aws_credentials(session, [])
assert get_caller_identity["region"] == aws_region
caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"])
assert caller_identity_arn.partition == aws_partition
assert caller_identity_arn.region is None
assert caller_identity_arn.resource == "test-user"
assert caller_identity_arn.resource_type == "user"
assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"])
assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER
@mock_sts
@mock_iam
def test_validate_credentials_commercial_partition_without_regions_and_profile_region_so_us_east_1(
self,
):
# AWS Region for AWS COMMERCIAL
aws_region = "eu-west-1"
aws_partition = "aws"
# Create a mock IAM user
iam_client = boto3.client("iam", region_name=aws_region)
iam_user = iam_client.create_user(UserName="test-user")["User"]
# Create a mock IAM access keys
access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[
"AccessKey"
]
access_key_id = access_key["AccessKeyId"]
secret_access_key = access_key["SecretAccessKey"]
# Create AWS session to validate
session = boto3.session.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name=None,
)
get_caller_identity = validate_aws_credentials(session, [])
assert get_caller_identity["region"] == "us-east-1"
caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"])
assert caller_identity_arn.partition == aws_partition
assert caller_identity_arn.region is None
assert caller_identity_arn.resource == "test-user"
assert caller_identity_arn.resource_type == "user"
assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"])
assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER
@mock_sts
@mock_iam
def test_validate_credentials_china_partition_without_regions_and_profile_region_so_us_east_1(
self,
):
# AWS Region for AWS COMMERCIAL
aws_region = "eu-west-1"
aws_partition = "aws"
# Create a mock IAM user
iam_client = boto3.client("iam", region_name=aws_region)
iam_user = iam_client.create_user(UserName="test-user")["User"]
# Create a mock IAM access keys
access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[
"AccessKey"
]
access_key_id = access_key["AccessKeyId"]
secret_access_key = access_key["SecretAccessKey"]
# Create AWS session to validate
session = boto3.session.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name=None,
)
get_caller_identity = validate_aws_credentials(session, [])
assert get_caller_identity["region"] == "us-east-1"
caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"])
assert caller_identity_arn.partition == aws_partition
assert caller_identity_arn.region is None
assert caller_identity_arn.resource == "test-user"
assert caller_identity_arn.resource_type == "user"
assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"])
assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER
@mock_sts
@mock_iam
@patch(
"botocore.client.BaseClient._make_api_call", new=mock_get_caller_identity_china
)
def test_validate_credentials_china_partition(self):
# AWS Region for AWS CHINA
aws_region = "cn-north-1"
aws_partition = "aws-cn"
# Create a mock IAM user
iam_client = boto3.client("iam", region_name=aws_region)
iam_user = iam_client.create_user(UserName="test-user")["User"]
# Create a mock IAM access keys
access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[
"AccessKey"
]
access_key_id = access_key["AccessKeyId"]
secret_access_key = access_key["SecretAccessKey"]
# Create AWS session to validate
session = boto3.session.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name=aws_region,
)
get_caller_identity = validate_aws_credentials(session, [aws_region])
# To use GovCloud or China it is either required:
# - Set the AWS profile region with a valid partition region
# - Use the -f/--region with a valid partition region
assert get_caller_identity["region"] == aws_region
caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"])
assert caller_identity_arn.partition == aws_partition
assert caller_identity_arn.region is None
assert caller_identity_arn.resource == "test-user"
assert caller_identity_arn.resource_type == "user"
assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"])
assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER
@mock_sts
@mock_iam
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_get_caller_identity_gov_cloud,
)
def test_validate_credentials_gov_cloud_partition(self):
# AWS Region for US GOV CLOUD
aws_region = "us-gov-east-1"
aws_partition = "aws-us-gov"
# Create a mock IAM user
iam_client = boto3.client("iam", region_name=aws_region)
iam_user = iam_client.create_user(UserName="test-user")["User"]
# Create a mock IAM access keys
access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[
"AccessKey"
]
access_key_id = access_key["AccessKeyId"]
secret_access_key = access_key["SecretAccessKey"]
# Create AWS session to validate
session = boto3.session.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
region_name=aws_region,
)
get_caller_identity = validate_aws_credentials(session, [aws_region])
# To use GovCloud or China it is either required:
# - Set the AWS profile region with a valid partition region
# - Use the -f/--region with a valid partition region
assert get_caller_identity["region"] == aws_region
caller_identity_arn = parse_iam_credentials_arn(get_caller_identity["Arn"])
assert caller_identity_arn.partition == aws_partition
assert caller_identity_arn.region is None
assert caller_identity_arn.resource == "test-user"
assert caller_identity_arn.resource_type == "user"
assert re.match("[0-9a-zA-Z]{20}", get_caller_identity["UserId"])
assert get_caller_identity["Account"] == AWS_ACCOUNT_NUMBER

View File

@@ -0,0 +1,61 @@
import json
import boto3
import sure # noqa
from moto import mock_iam, mock_organizations, mock_sts
from prowler.providers.aws.lib.organizations.organizations import (
get_organizations_metadata,
)
AWS_ACCOUNT_NUMBER = "123456789012"
class Test_AWS_Organizations:
@mock_organizations
@mock_sts
@mock_iam
def test_organizations(self):
client = boto3.client("organizations", region_name="us-east-1")
iam_client = boto3.client("iam", region_name="us-east-1")
sts_client = boto3.client("sts", region_name="us-east-1")
mockname = "mock-account"
mockdomain = "moto-example.org"
mockemail = "@".join([mockname, mockdomain])
org_id = client.create_organization(FeatureSet="ALL")["Organization"]["Id"]
account_id = client.create_account(AccountName=mockname, Email=mockemail)[
"CreateAccountStatus"
]["AccountId"]
client.tag_resource(
ResourceId=account_id, Tags=[{"Key": "key", "Value": "value"}]
)
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"},
"Action": "sts:AssumeRole",
},
}
iam_role_arn = iam_client.role_arn = iam_client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument=json.dumps(trust_policy_document),
)["Role"]["Arn"]
session_name = "new-session"
assumed_role = sts_client.assume_role(
RoleArn=iam_role_arn, RoleSessionName=session_name
)
org = get_organizations_metadata(account_id, assumed_role)
org.account_details_email.should.equal(mockemail)
org.account_details_name.should.equal(mockname)
org.account_details_arn.should.equal(
f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/{org_id}/{account_id}"
)
org.account_details_org.should.equal(org_id)
org.account_details_tags.should.equal("key:value,")

View File

@@ -1,15 +1,9 @@
import json
import boto3 import boto3
import botocore
import sure # noqa import sure # noqa
from boto3 import session
from mock import patch from mock import patch
from moto import ( from moto import mock_ec2, mock_resourcegroupstaggingapi
mock_ec2,
mock_iam,
mock_organizations,
mock_resourcegroupstaggingapi,
mock_sts,
)
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.azure.azure_provider import Azure_Provider from prowler.providers.azure.azure_provider import Azure_Provider
@@ -26,24 +20,8 @@ from prowler.providers.gcp.gcp_provider import GCP_Provider
from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info
EXAMPLE_AMI_ID = "ami-12c6146b" EXAMPLE_AMI_ID = "ami-12c6146b"
ACCOUNT_ID = 123456789012 AWS_ACCOUNT_NUMBER = "123456789012"
mock_current_audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=None,
audited_account="123456789012",
audited_identity_arn="arn:aws:iam::123456789012:user/test",
audited_user_id="test",
audited_partition="aws",
profile="default",
profile_region="eu-west-1",
credentials=None,
assumed_role_info=None,
audited_regions=["eu-west-2", "eu-west-1"],
organizations_metadata=None,
audit_resources=None,
audit_metadata=None,
)
mock_azure_audit_info = Azure_Audit_Info( mock_azure_audit_info = Azure_Audit_Info(
credentials=None, credentials=None,
@@ -54,6 +32,31 @@ mock_azure_audit_info = Azure_Audit_Info(
mock_set_audit_info = Audit_Info() mock_set_audit_info = Audit_Info()
# Mocking GetCallerIdentity for China and GovCloud
make_api_call = botocore.client.BaseClient._make_api_call
def mock_get_caller_identity_china(self, operation_name, kwarg):
if operation_name == "GetCallerIdentity":
return {
"UserId": "XXXXXXXXXXXXXXXXXXXXX",
"Account": AWS_ACCOUNT_NUMBER,
"Arn": f"arn:aws-cn:iam::{AWS_ACCOUNT_NUMBER}:user/test-user",
}
return make_api_call(self, operation_name, kwarg)
def mock_get_caller_identity_gov_cloud(self, operation_name, kwarg):
if operation_name == "GetCallerIdentity":
return {
"UserId": "XXXXXXXXXXXXXXXXXXXXX",
"Account": AWS_ACCOUNT_NUMBER,
"Arn": f"arn:aws-us-gov:iam::{AWS_ACCOUNT_NUMBER}:user/test-user",
}
return make_api_call(self, operation_name, kwarg)
def mock_validate_credentials(*_): def mock_validate_credentials(*_):
caller_identity = { caller_identity = {
@@ -81,118 +84,60 @@ def mock_set_gcp_credentials(*_):
class Test_Set_Audit_Info: class Test_Set_Audit_Info:
@patch( # Mocked Audit Info
"prowler.providers.common.audit_info.current_audit_info", def set_mocked_audit_info(self):
new=mock_current_audit_info, audit_info = AWS_Audit_Info(
) session_config=None,
@mock_sts original_session=None,
@mock_iam audit_session=session.Session(
def test_validate_credentials(self): profile_name=None,
# Create a mock IAM user botocore_session=None,
iam_client = boto3.client("iam", region_name="us-east-1") ),
iam_user = iam_client.create_user(UserName="test-user")["User"] audited_account=AWS_ACCOUNT_NUMBER,
# Create a mock IAM access keys audited_user_id=None,
access_key = iam_client.create_access_key(UserName=iam_user["UserName"])[ audited_partition="aws",
"AccessKey" audited_identity_arn="arn:aws:iam::123456789012:user/test",
] profile=None,
access_key_id = access_key["AccessKeyId"] profile_region="eu-west-1",
secret_access_key = access_key["SecretAccessKey"] credentials=None,
# Create AWS session to validate assumed_role_info=None,
session = boto3.session.Session( audited_regions=["eu-west-2", "eu-west-1"],
aws_access_key_id=access_key_id, organizations_metadata=None,
aws_secret_access_key=secret_access_key, audit_resources=None,
region_name="us-east-1",
) )
audit_info = Audit_Info()
get_caller_identity = audit_info.validate_credentials(session)
get_caller_identity["Arn"].should.equal(iam_user["Arn"]) return audit_info
get_caller_identity["UserId"].should.equal(iam_user["UserId"])
# assert get_caller_identity["UserId"] == str(ACCOUNT_ID)
@patch( @patch(
"prowler.providers.common.audit_info.current_audit_info", "prowler.providers.common.audit_info.validate_aws_credentials",
new=mock_current_audit_info, new=mock_validate_credentials,
) )
@mock_organizations
@mock_sts
@mock_iam
def test_organizations(self):
client = boto3.client("organizations", region_name="us-east-1")
iam_client = boto3.client("iam", region_name="us-east-1")
sts_client = boto3.client("sts", region_name="us-east-1")
mockname = "mock-account"
mockdomain = "moto-example.org"
mockemail = "@".join([mockname, mockdomain])
org_id = client.create_organization(FeatureSet="ALL")["Organization"]["Id"]
account_id = client.create_account(AccountName=mockname, Email=mockemail)[
"CreateAccountStatus"
]["AccountId"]
client.tag_resource(
ResourceId=account_id, Tags=[{"Key": "key", "Value": "value"}]
)
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::{account_id}:root".format(
account_id=ACCOUNT_ID
)
},
"Action": "sts:AssumeRole",
},
}
iam_role_arn = iam_client.role_arn = iam_client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument=json.dumps(trust_policy_document),
)["Role"]["Arn"]
session_name = "new-session"
assumed_role = sts_client.assume_role(
RoleArn=iam_role_arn, RoleSessionName=session_name
)
audit_info = Audit_Info()
org = audit_info.get_organizations_metadata(account_id, assumed_role)
org.account_details_email.should.equal(mockemail)
org.account_details_name.should.equal(mockname)
org.account_details_arn.should.equal(
"arn:aws:organizations::{0}:account/{1}/{2}".format(
ACCOUNT_ID, org_id, account_id
)
)
org.account_details_org.should.equal(org_id)
org.account_details_tags.should.equal("key:value,")
@patch( @patch(
"prowler.providers.common.audit_info.current_audit_info", "prowler.providers.common.audit_info.print_aws_credentials",
new=mock_current_audit_info, new=mock_print_audit_credentials,
) )
@patch.object(Audit_Info, "validate_credentials", new=mock_validate_credentials)
@patch.object(Audit_Info, "print_aws_credentials", new=mock_print_audit_credentials)
def test_set_audit_info_aws(self): def test_set_audit_info_aws(self):
provider = "aws" with patch(
arguments = { "prowler.providers.common.audit_info.current_audit_info",
"profile": None, new=self.set_mocked_audit_info(),
"role": None, ):
"session_duration": None, provider = "aws"
"external_id": None, arguments = {
"regions": None, "profile": None,
"organizations_role": None, "role": None,
"subscriptions": None, "session_duration": None,
"az_cli_auth": None, "external_id": None,
"sp_env_auth": None, "regions": None,
"browser_auth": None, "organizations_role": None,
"managed_entity_auth": None, "subscriptions": None,
} "az_cli_auth": None,
"sp_env_auth": None,
"browser_auth": None,
"managed_entity_auth": None,
}
audit_info = set_provider_audit_info(provider, arguments) audit_info = set_provider_audit_info(provider, arguments)
assert isinstance(audit_info, AWS_Audit_Info) assert isinstance(audit_info, AWS_Audit_Info)
@patch( @patch(
"prowler.providers.common.audit_info.azure_audit_info", "prowler.providers.common.audit_info.azure_audit_info",
@@ -242,45 +187,48 @@ class Test_Set_Audit_Info:
@mock_resourcegroupstaggingapi @mock_resourcegroupstaggingapi
@mock_ec2 @mock_ec2
def test_get_tagged_resources(self): def test_get_tagged_resources(self):
client = boto3.client("ec2", region_name="eu-central-1") with patch(
instances = client.run_instances( "prowler.providers.common.audit_info.current_audit_info",
ImageId=EXAMPLE_AMI_ID, new=self.set_mocked_audit_info(),
MinCount=1, ) as mock_audit_info:
MaxCount=1, client = boto3.client("ec2", region_name="eu-central-1")
InstanceType="t2.micro", instances = client.run_instances(
TagSpecifications=[ ImageId=EXAMPLE_AMI_ID,
{ MinCount=1,
"ResourceType": "instance", MaxCount=1,
"Tags": [ InstanceType="t2.micro",
{"Key": "MY_TAG1", "Value": "MY_VALUE1"}, TagSpecifications=[
{"Key": "MY_TAG2", "Value": "MY_VALUE2"}, {
], "ResourceType": "instance",
}, "Tags": [
{ {"Key": "MY_TAG1", "Value": "MY_VALUE1"},
"ResourceType": "instance", {"Key": "MY_TAG2", "Value": "MY_VALUE2"},
"Tags": [{"Key": "ami", "Value": "test"}], ],
}, },
], {
) "ResourceType": "instance",
instance_id = instances["Instances"][0]["InstanceId"] "Tags": [{"Key": "ami", "Value": "test"}],
image_id = client.create_image(Name="testami", InstanceId=instance_id)[ },
"ImageId" ],
] )
client.create_tags(Resources=[image_id], Tags=[{"Key": "ami", "Value": "test"}]) instance_id = instances["Instances"][0]["InstanceId"]
image_id = client.create_image(Name="testami", InstanceId=instance_id)[
"ImageId"
]
client.create_tags(
Resources=[image_id], Tags=[{"Key": "ami", "Value": "test"}]
)
mock_current_audit_info.audited_regions = ["eu-central-1"] mock_audit_info.audited_regions = ["eu-central-1"]
mock_current_audit_info.audit_session = boto3.session.Session() mock_audit_info.audit_session = boto3.session.Session()
assert len(get_tagged_resources(["ami=test"], mock_current_audit_info)) == 2 assert len(get_tagged_resources(["ami=test"], mock_audit_info)) == 2
assert image_id in str( assert image_id in str(get_tagged_resources(["ami=test"], mock_audit_info))
get_tagged_resources(["ami=test"], mock_current_audit_info) assert instance_id in str(
) get_tagged_resources(["ami=test"], mock_audit_info)
assert instance_id in str( )
get_tagged_resources(["ami=test"], mock_current_audit_info) assert (
) len(get_tagged_resources(["MY_TAG1=MY_VALUE1"], mock_audit_info)) == 1
assert ( )
len(get_tagged_resources(["MY_TAG1=MY_VALUE1"], mock_current_audit_info)) assert instance_id in str(
== 1 get_tagged_resources(["MY_TAG1=MY_VALUE1"], mock_audit_info)
) )
assert instance_id in str(
get_tagged_resources(["MY_TAG1=MY_VALUE1"], mock_current_audit_info)
)