feat(scan-type): AWS Resource ARNs based scan (#1807)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2023-02-01 14:09:22 +01:00
committed by GitHub
parent 360c6f3c1c
commit c7a9492e96
10 changed files with 129 additions and 46 deletions

View File

@@ -0,0 +1,9 @@
# Resource ARNs based Scan
Prowler allows you to scan only the resources with specific AWS Resource ARNs. This can be done with the flag `--resource-arn` followed by one or more [Amazon Resource Names (ARNs)](https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html) separated by space:
```
prowler aws --resource-arn arn:aws:iam::012345678910:user/test arn:aws:ec2:us-east-1:123456789012:vpc/vpc-12345678
```
This example will only scan the two resources with those ARNs.

View File

@@ -1,9 +1,9 @@
# Tags-based Scan
Prowler allows you to scan only the resources that contain specific tags. This can be done with the flag `-t/--scan-tags` followed by the tags `Key=Value` separated by space:
Prowler allows you to scan only the resources that contain specific tags. This can be done with the flag `--resource-tags` followed by the tags `Key=Value` separated by space:
```
prowler aws --scan-tags Environment=dev Project=prowler
prowler aws --resource-tags Environment=dev Project=prowler
```
This example will only scan the resources that contains both tags.

View File

@@ -45,6 +45,7 @@ nav:
- AWS CloudShell: tutorials/aws/cloudshell.md
- Checks v2 to v3 Mapping: tutorials/aws/v2_to_v3_checks_mapping.md
- Tag-based Scan: tutorials/aws/tag-based-scan.md
- Resource ARNs based Scan: tutorials/aws/resource-arn-based-scan.md
- Azure:
- Authentication: tutorials/azure/authentication.md
- Subscriptions: tutorials/azure/subscriptions.md

View File

@@ -99,6 +99,9 @@ def prowler():
)
sys.exit()
# Set the audit info based on the selected provider
audit_info = set_provider_audit_info(provider, args.__dict__)
# Load checks to execute
checks_to_execute = load_checks_to_execute(
bulk_checks_metadata,
@@ -110,13 +113,14 @@ def prowler():
compliance_framework,
categories,
provider,
audit_info,
)
# Exclude checks if -e/--excluded-checks
if excluded_checks:
checks_to_execute = exclude_checks_to_run(checks_to_execute, excluded_checks)
# Exclude services if -s/--excluded-services
# Exclude services if --excluded-services
if excluded_services:
checks_to_execute = exclude_services_to_run(
checks_to_execute, excluded_services, provider
@@ -130,9 +134,6 @@ def prowler():
print_checks(provider, checks_to_execute, bulk_checks_metadata)
sys.exit()
# Set the audit info based on the selected provider
audit_info = set_provider_audit_info(provider, args.__dict__)
# Parse content from Allowlist file and get it, if necessary, from S3
if provider == "aws" and args.allowlist_file:
allowlist_file = parse_allowlist_file(audit_info, args.allowlist_file)

View File

@@ -4,6 +4,7 @@ from prowler.lib.check.check import (
recover_checks_from_provider,
)
from prowler.lib.logger import logger
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
# Generate the list of checks to execute
@@ -18,10 +19,25 @@ def load_checks_to_execute(
compliance_frameworks: list,
categories: set,
provider: str,
audit_info: AWS_Audit_Info,
) -> set:
"""Generate the list of checks to execute based on the cloud provider and input arguments specified"""
checks_to_execute = set()
# Handle if there are audit resources so only their services are executed
if audit_info.audit_resources:
service_list = []
for resource in audit_info.audit_resources:
service = resource.split(":")[2]
# Parse services when they are different in the ARNs
if service == "lambda":
service = "awslambda"
if service == "elasticloadbalancing":
service = "elb"
elif service == "logs":
service = "cloudwatch"
service_list.append(service)
# Handle if there are checks passed using -c/--checks
if check_list:
for check_name in check_list:

View File

@@ -4,6 +4,14 @@ from argparse import RawTextHelpFormatter
from prowler.config.config import default_output_directory, prowler_version
from prowler.providers.aws.aws_provider import get_aws_available_regions
from prowler.providers.aws.lib.arn.arn import is_valid_arn
def arn_type(arn: str) -> bool:
"""arn_type returns a string ARN if it is valid and raises an argparse.ArgumentError if not."""
if not is_valid_arn(arn):
raise argparse.ArgumentError("Invalid ARN")
return arn
class ProwlerArgumentParser:
@@ -343,14 +351,23 @@ Detailed documentation at https://docs.prowler.cloud
default=None,
help="Path for allowlist yaml file. See example prowler/config/allowlist.yaml for reference and format. It also accepts AWS DynamoDB Table or Lambda ARNs or S3 URIs, see more in https://docs.prowler.cloud/en/latest/tutorials/allowlist/",
)
# Allowlist
audit_tags_subparser = aws_parser.add_argument_group("Tags-based Scan")
audit_tags_subparser.add_argument(
"-t",
"--scan-tags",
# Based Scans
aws_based_scans_subparser = aws_parser.add_argument_group("AWS Based Scans")
aws_based_scans_parser = (
aws_based_scans_subparser.add_mutually_exclusive_group()
)
aws_based_scans_parser.add_argument(
"--resource-tags",
nargs="+",
default=None,
help="Scan only resources with specific tags (Key=Value), e.g., Environment=dev Project=prowler",
help="Scan only resources with specific AWS Tags (Key=Value), e.g., Environment=dev Project=prowler",
)
aws_based_scans_parser.add_argument(
"--resource-arn",
nargs="+",
type=arn_type,
default=None,
help="Scan only resources with specific AWS Resource ARNs, e.g., arn:aws:iam::012345678910:user/test arn:aws:ec2:us-east-1:123456789012:vpc/vpc-12345678",
)
def __init_azure_parser__(self):

View File

@@ -1,3 +1,5 @@
import re
from arnparse import arnparse
from prowler.providers.aws.lib.arn.error import (
@@ -43,3 +45,9 @@ def arn_parsing(arn):
raise RoleArnParsingEmptyResource
else:
return arn_parsed
def is_valid_arn(arn: str) -> bool:
"""is_valid_arn returns True or False whether the given AWS ARN (Amazon Resource Name) is valid or not."""
regex = r"^arn:aws(-cn|-us-gov)?:[a-zA-Z0-9\-]+:([a-z]{2}-[a-z]+-\d{1})?:(\d{12})?:[a-zA-Z0-9\-_\/]+(:\d+)?$"
return re.match(regex, arn) is not None

View File

@@ -5,7 +5,11 @@ from boto3 import client, session
from colorama import Fore, Style
from prowler.lib.logger import logger
from prowler.providers.aws.aws_provider import AWS_Provider, assume_role
from prowler.providers.aws.aws_provider import (
AWS_Provider,
assume_role,
generate_regional_clients,
)
from prowler.providers.aws.lib.arn.arn import arn_parsing
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.lib.audit_info.models import (
@@ -237,10 +241,16 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE
self.print_audit_credentials(current_audit_info)
# Parse Scan Tags
input_scan_tags = arguments.get("scan_tags")
current_audit_info.audit_resources = get_tagged_resources(
input_scan_tags, current_audit_info
)
if arguments.get("resource_tags"):
input_resource_tags = arguments.get("resource_tags")
current_audit_info.audit_resources = get_tagged_resources(
input_resource_tags, current_audit_info
)
# Parse Input Resource ARNs
if arguments.get("resource_arn"):
current_audit_info.audit_resources = arguments.get("resource_arn")
return current_audit_info
def set_azure_audit_info(self, arguments) -> Azure_Audit_Info:
@@ -294,27 +304,30 @@ def set_provider_audit_info(provider: str, arguments: dict):
return provider_audit_info
def get_tagged_resources(input_scan_tags: list, current_audit_info: AWS_Audit_Info):
def get_tagged_resources(input_resource_tags: list, current_audit_info: AWS_Audit_Info):
"""
get_tagged_resources returns a list of the resources that are going to be scanned based on the given input tags
"""
try:
scan_tags = []
resource_tags = []
tagged_resources = []
if input_scan_tags:
for tag in input_scan_tags:
key = tag.split("=")[0]
value = tag.split("=")[1]
scan_tags.append({"Key": key, "Values": [value]})
# Get Resources with scan_tags for all regions
for region in current_audit_info.audited_regions:
client = current_audit_info.audit_session.client(
"resourcegroupstaggingapi", region_name=region
)
get_resources_paginator = client.get_paginator("get_resources")
for page in get_resources_paginator.paginate(TagFilters=scan_tags):
for tag in input_resource_tags:
key = tag.split("=")[0]
value = tag.split("=")[1]
resource_tags.append({"Key": key, "Values": [value]})
# Get Resources with resource_tags for all regions
for regional_client in generate_regional_clients(
"resourcegroupstaggingapi", current_audit_info
).values():
try:
get_resources_paginator = regional_client.get_paginator("get_resources")
for page in get_resources_paginator.paginate(TagFilters=resource_tags):
for resource in page["ResourceTagMappingList"]:
tagged_resources.append(resource["ResourceARN"])
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -54,7 +54,7 @@ class Test_Parser:
assert not parsed.output_bucket_no_assume
assert not parsed.shodan
assert not parsed.allowlist_file
assert not parsed.scan_tags
assert not parsed.resource_tags
def test_default_parser_no_arguments_azure(self):
provider = "azure"
@@ -796,23 +796,33 @@ class Test_Parser:
parsed = self.parser.parse(command)
assert parsed.allowlist_file == allowlist_file
def test_aws_parser_scan_tags_short(self):
argument = "-t"
scan_tag = "Key=Value"
command = [prowler_command, argument, scan_tag]
parsed = self.parser.parse(command)
assert len(parsed.scan_tags) == 1
assert scan_tag in parsed.scan_tags
def test_aws_parser_scan_tags_long(self):
argument = "--scan-tags"
def test_aws_parser_resource_tags(self):
argument = "--resource-tags"
scan_tag1 = "Key=Value"
scan_tag2 = "Key2=Value2"
command = [prowler_command, argument, scan_tag1, scan_tag2]
parsed = self.parser.parse(command)
assert len(parsed.scan_tags) == 2
assert scan_tag1 in parsed.scan_tags
assert scan_tag2 in parsed.scan_tags
assert len(parsed.resource_tags) == 2
assert scan_tag1 in parsed.resource_tags
assert scan_tag2 in parsed.resource_tags
def test_aws_parser_resource_arn(self):
argument = "--resource-arn"
resource_arn1 = "arn:aws:iam::012345678910:user/test"
resource_arn2 = "arn:aws:ec2:us-east-1:123456789012:vpc/vpc-12345678"
command = [prowler_command, argument, resource_arn1, resource_arn2]
parsed = self.parser.parse(command)
assert len(parsed.resource_arn) == 2
assert resource_arn1 in parsed.resource_arn
assert resource_arn2 in parsed.resource_arn
def test_aws_parser_wrong_resource_arn(self):
argument = "--resource-arn"
resource_arn = "arn:azure:iam::account:user/test"
command = [prowler_command, argument, resource_arn]
with pytest.raises(SystemExit) as ex:
self.parser.parse(command)
assert ex.type == SystemExit
def test_parser_azure_auth_sp(self):
argument = "--sp-env-auth"

View File

@@ -1,6 +1,6 @@
import sure # noqa
from prowler.providers.aws.lib.arn.arn import arn_parsing
from prowler.providers.aws.lib.arn.arn import arn_parsing, is_valid_arn
ACCOUNT_ID = "123456789012"
RESOURCE_TYPE = "role"
@@ -31,3 +31,11 @@ class Test_ARN_Parsing:
parsed_arn.account_id.should.equal(test["expected"]["account_id"])
parsed_arn.resource_type.should.equal(test["expected"]["resource_type"])
parsed_arn.resource.should.equal(test["expected"]["resource"])
def test_is_valid_arn(self):
assert is_valid_arn("arn:aws:iam::012345678910:user/test")
assert is_valid_arn("arn:aws-cn:ec2:us-east-1:123456789012:vpc/vpc-12345678")
assert is_valid_arn("arn:aws-us-gov:s3:::bucket")
assert not is_valid_arn("arn:azure:::012345678910:user/test")
assert not is_valid_arn("arn:aws:iam::account:user/test")
assert not is_valid_arn("arn:aws:::012345678910:resource")