chore(resource-based scan): execute only applicable checks (#1934)

This commit is contained in:
Sergio Garcia
2023-02-23 13:30:21 +01:00
committed by GitHub
parent 4b935a40b6
commit 849b703828
3 changed files with 74 additions and 14 deletions

View File

@@ -11,6 +11,7 @@ from prowler.lib.check.check import (
exclude_services_to_run,
execute_checks,
get_checks_from_input_arn,
get_regions_from_audit_resources,
list_categories,
list_services,
print_categories,
@@ -136,6 +137,9 @@ def prowler():
# Once the audit_info is set and we have the eventual checks from arn, it is time to exclude the others
if audit_info.audit_resources:
audit_info.audited_regions = get_regions_from_audit_resources(
audit_info.audit_resources
)
checks_to_execute = get_checks_from_input_arn(
audit_info.audit_resources, provider
)

View File

@@ -511,19 +511,61 @@ def get_checks_from_input_arn(audit_resources: list, provider: str) -> set:
checks_from_arn = set()
# Handle if there are audit resources so only their services are executed
if audit_resources:
service_list = []
services_without_subservices = ["guardduty", "kms", "s3", "elb"]
service_list = set()
sub_service_list = set()
for resource in audit_resources:
service = resource.split(":")[2]
# Parse services when they are different in the ARNs
if service == "lambda":
service = "awslambda"
if service == "elasticloadbalancing":
service = "elb"
elif service == "logs":
service = "cloudwatch"
service_list.append(service)
sub_service = resource.split(":")[5].split("/")[0].replace("-", "_")
checks_from_arn = recover_checks_from_service(service_list, provider)
if (
service != "wafv2" and service != "waf"
): # WAF Services does not have checks
# Parse services when they are different in the ARNs
if service == "lambda":
service = "awslambda"
if service == "elasticloadbalancing":
service = "elb"
elif service == "logs":
service = "cloudwatch"
service_list.add(service)
# Get subservices to execute only applicable checks
if service not in services_without_subservices:
# Parse some specific subservices
if service == "ec2":
if sub_service == "security_group":
sub_service = "securitygroup"
if sub_service == "network_acl":
sub_service = "networkacl"
if sub_service == "image":
sub_service = "ami"
if service == "rds":
if sub_service == "cluster_snapshot":
sub_service = "snapshot"
sub_service_list.add(sub_service)
else:
sub_service_list.add(service)
checks = recover_checks_from_service(service_list, provider)
# Filter only checks with audited subservices
for check in checks:
if any(sub_service in check for sub_service in sub_service_list):
if not (sub_service == "policy" and "password_policy" in check):
checks_from_arn.add(check)
# Return final checks list
return checks_from_arn
return sorted(checks_from_arn)
def get_regions_from_audit_resources(audit_resources: list) -> list:
"""get_regions_from_audit_resources gets the regions from the audit resources arns"""
audited_regions = []
for resource in audit_resources:
region = resource.split(":")[3]
if region and region not in audited_regions: # Check if arn has a region
audited_regions.append(region)
if audited_regions:
return audited_regions
return None

View File

@@ -9,6 +9,7 @@ from prowler.lib.check.check import (
exclude_checks_to_run,
exclude_services_to_run,
get_checks_from_input_arn,
get_regions_from_audit_resources,
list_modules,
list_services,
parse_checks_from_file,
@@ -288,14 +289,27 @@ class Test_Check:
def test_get_checks_from_input_arn(self):
audit_resources = ["arn:aws:lambda:us-east-1:123456789:function:test-lambda"]
provider = "aws"
expected_checks = {
"awslambda_function_url_cors_policy",
expected_checks = [
"awslambda_function_invoke_api_operations_cloudtrail_logging_enabled",
"awslambda_function_no_secrets_in_code",
}
"awslambda_function_url_cors_policy",
]
recovered_checks = get_checks_from_input_arn(audit_resources, provider)
assert recovered_checks == expected_checks
def test_get_regions_from_audit_resources(self):
audit_resources = [
"arn:aws:lambda:us-east-1:123456789:function:test-lambda",
"arn:aws:iam::106908755756:policy/test",
"arn:aws:ec2:eu-west-1:106908755756:security-group/sg-test",
]
expected_regions = [
"us-east-1",
"eu-west-1",
]
recovered_regions = get_regions_from_audit_resources(audit_resources)
assert recovered_regions == expected_regions
# def test_parse_checks_from_compliance_framework_two(self):
# test_case = {
# "input": {"compliance_frameworks": ["cis_v1.4_aws", "ens_v3_aws"]},