From b1968f3f8b1e8080096963d63f0cbdc7eaffcc64 Mon Sep 17 00:00:00 2001 From: Nacho Rivera Date: Thu, 6 Jul 2023 15:33:32 +0200 Subject: [PATCH] fix(allowlist): reformat allowlist logic (#2555) Co-authored-by: Sergio Garcia --- .../providers/aws/lib/allowlist/allowlist.py | 146 ++++++------ .../aws/lib/allowlist/allowlist_test.py | 207 +++++++----------- 2 files changed, 148 insertions(+), 205 deletions(-) diff --git a/prowler/providers/aws/lib/allowlist/allowlist.py b/prowler/providers/aws/lib/allowlist/allowlist.py index 3fb82e81..142b338e 100644 --- a/prowler/providers/aws/lib/allowlist/allowlist.py +++ b/prowler/providers/aws/lib/allowlist/allowlist.py @@ -115,20 +115,22 @@ def parse_allowlist_file(audit_info, allowlist_file): def is_allowlisted(allowlist, audited_account, check, region, resource, tags): try: + # By default is not allowlisted + is_finding_allowlisted = False + # First set account key from allowlist dict if audited_account in allowlist["Accounts"]: account = audited_account - if is_allowlisted_in_check( - allowlist, audited_account, account, check, region, resource, tags - ): - return True # If there is a *, it affects to all accounts - if "*" in allowlist["Accounts"]: + elif "*" in allowlist["Accounts"]: account = "*" - if is_allowlisted_in_check( - allowlist, audited_account, account, check, region, resource, tags - ): - return True - return False + # Test if it is allowlisted + allowlisted_checks = allowlist["Accounts"][account]["Checks"] + if is_allowlisted_in_check( + allowlisted_checks, audited_account, account, check, region, resource, tags + ): + is_finding_allowlisted = True + + return is_finding_allowlisted except Exception as error: logger.critical( f"{error.__class__.__name__} -- {error}[{error.__traceback__.tb_lineno}]" @@ -137,55 +139,47 @@ def is_allowlisted(allowlist, audited_account, check, region, resource, tags): def is_allowlisted_in_check( - allowlist, audited_account, account, check, region, resource, tags + allowlisted_checks, audited_account, account, check, region, resource, tags ): try: - allowlisted_checks = allowlist["Accounts"][account]["Checks"] - for allowlisted_check in allowlisted_checks.keys(): - if re.search("^lambda", allowlisted_check): - mapped_check = re.sub("^lambda", "awslambda", allowlisted_check) - # we update the dictionary - allowlisted_checks[mapped_check] = allowlisted_checks.pop( - allowlisted_check - ) - # and the single element - allowlisted_check = mapped_check - + # Default value is not allowlisted + is_check_allowlisted = False + for allowlisted_check, allowlisted_check_info in allowlisted_checks.items(): + # map lambda to awslambda + allowlisted_check = re.sub("^lambda", "awslambda", allowlisted_check) + # extract the exceptions + exceptions = allowlisted_check_info.get("Exceptions") # Check if there are exceptions if is_excepted( - allowlisted_checks, - allowlisted_check, + exceptions, audited_account, region, resource, tags, ): - return False + # Break loop and return default value since is excepted + break + + allowlisted_regions = allowlisted_check_info.get("Regions") + allowlisted_resources = allowlisted_check_info.get("Resources") + allowlisted_tags = allowlisted_check_info.get("Tags") # If there is a *, it affects to all checks - if "*" == allowlisted_check: - check = "*" + if ( + "*" == allowlisted_check + or check == allowlisted_check + or re.search(allowlisted_check, check) + ): if is_allowlisted_in_region( - allowlist, account, check, region, resource, tags - ): - return True - # Check if there is the specific check - elif check == allowlisted_check: - if is_allowlisted_in_region( - allowlist, account, check, region, resource, tags - ): - return True - # Check if check is a regex - elif re.search(allowlisted_check, check): - if is_allowlisted_in_region( - allowlist, - account, - allowlisted_check, + allowlisted_regions, + allowlisted_resources, + allowlisted_tags, region, resource, tags, ): - return True - return False + is_check_allowlisted = True + + return is_check_allowlisted except Exception as error: logger.critical( f"{error.__class__.__name__} -- {error}[{error.__traceback__.tb_lineno}]" @@ -193,28 +187,26 @@ def is_allowlisted_in_check( sys.exit(1) -def is_allowlisted_in_region(allowlist, account, check, region, resource, tags): +def is_allowlisted_in_region( + allowlist_regions, allowlist_resources, allowlisted_tags, region, resource, tags +): try: + # By default is not allowlisted + is_region_allowlisted = False # If there is a *, it affects to all regions - if "*" in allowlist["Accounts"][account]["Checks"][check]["Regions"]: - for elem in allowlist["Accounts"][account]["Checks"][check]["Resources"]: + if "*" in allowlist_regions or region in allowlist_regions: + for elem in allowlist_resources: if is_allowlisted_in_tags( - allowlist["Accounts"][account]["Checks"][check], + allowlisted_tags, elem, resource, tags, ): - return True - # Check if there is the specific region - if region in allowlist["Accounts"][account]["Checks"][check]["Regions"]: - for elem in allowlist["Accounts"][account]["Checks"][check]["Resources"]: - if is_allowlisted_in_tags( - allowlist["Accounts"][account]["Checks"][check], - elem, - resource, - tags, - ): - return True + is_region_allowlisted = True + # if we find the element there is no point in continuing with the loop + break + + return is_region_allowlisted except Exception as error: logger.critical( f"{error.__class__.__name__} -- {error}[{error.__traceback__.tb_lineno}]" @@ -222,31 +214,25 @@ def is_allowlisted_in_region(allowlist, account, check, region, resource, tags): sys.exit(1) -def is_allowlisted_in_tags(check_allowlist, elem, resource, tags): +def is_allowlisted_in_tags(allowlisted_tags, elem, resource, tags): try: + # By default is not allowlisted + is_tag_allowlisted = False # Check if it is an * if elem == "*": elem = ".*" # Check if there are allowlisted tags - if "Tags" in check_allowlist: - # Check if there are resource tags - if not tags or not re.search(elem, resource): - return False - - all_allowed_tags_in_resource_tags = True - for allowed_tag in check_allowlist["Tags"]: - found_allowed_tag = False - if re.search(allowed_tag, tags): - found_allowed_tag = True - - if not found_allowed_tag: - all_allowed_tags_in_resource_tags = False + if allowlisted_tags: + for allowlisted_tag in allowlisted_tags: + if re.search(allowlisted_tag, tags): + is_tag_allowlisted = True break - return all_allowed_tags_in_resource_tags else: if re.search(elem, resource): - return True + is_tag_allowlisted = True + + return is_tag_allowlisted except Exception as error: logger.critical( f"{error.__class__.__name__} -- {error}[{error.__traceback__.tb_lineno}]" @@ -254,16 +240,13 @@ def is_allowlisted_in_tags(check_allowlist, elem, resource, tags): sys.exit(1) -def is_excepted( - allowlisted_checks, allowlisted_check, audited_account, region, resource, tags -): +def is_excepted(exceptions, audited_account, region, resource, tags): try: excepted = False is_account_excepted = False is_region_excepted = False is_resource_excepted = False is_tag_excepted = False - exceptions = allowlisted_checks[allowlisted_check].get("Exceptions") if exceptions: excepted_accounts = exceptions.get("Accounts", []) excepted_regions = exceptions.get("Regions", []) @@ -277,8 +260,9 @@ def is_excepted( for excepted_resource in excepted_resources: if re.search(excepted_resource, resource): is_resource_excepted = True - if tags in excepted_tags: - is_tag_excepted = True + for tag in excepted_tags: + if tag in tags: + is_tag_excepted = True if ( ( (excepted_accounts and is_account_excepted) diff --git a/tests/providers/aws/lib/allowlist/allowlist_test.py b/tests/providers/aws/lib/allowlist/allowlist_test.py index 8313a9bd..c02713d2 100644 --- a/tests/providers/aws/lib/allowlist/allowlist_test.py +++ b/tests/providers/aws/lib/allowlist/allowlist_test.py @@ -262,54 +262,52 @@ class Test_Allowlist: def test_is_allowlisted_in_region(self): # Allowlist example - allowlist = { - "Accounts": { - AWS_ACCOUNT_NUMBER: { - "Checks": { - "check_test": { - "Regions": ["us-east-1", "eu-west-1"], - "Resources": ["*"], - } - } - } - } - } + allowlisted_regions = ["us-east-1", "eu-west-1"] + allowlisted_resources = ["*"] assert is_allowlisted_in_region( - allowlist, AWS_ACCOUNT_NUMBER, "check_test", AWS_REGION, "prowler", "" + allowlisted_regions, allowlisted_resources, None, AWS_REGION, "prowler", "" ) assert is_allowlisted_in_region( - allowlist, AWS_ACCOUNT_NUMBER, "check_test", AWS_REGION, "prowler-test", "" + allowlisted_regions, + allowlisted_resources, + None, + AWS_REGION, + "prowler-test", + "", ) assert is_allowlisted_in_region( - allowlist, AWS_ACCOUNT_NUMBER, "check_test", AWS_REGION, "test-prowler", "" + allowlisted_regions, + allowlisted_resources, + None, + AWS_REGION, + "test-prowler", + "", ) assert not ( is_allowlisted_in_region( - allowlist, AWS_ACCOUNT_NUMBER, "check_test", "us-east-2", "test", "" + allowlisted_regions, + allowlisted_resources, + None, + "us-east-2", + "test", + "", ) ) def test_is_allowlisted_in_check(self): - # Allowlist example - allowlist = { - "Accounts": { - AWS_ACCOUNT_NUMBER: { - "Checks": { - "check_test": { - "Regions": ["us-east-1", "eu-west-1"], - "Resources": ["*"], - } - } - } + allowlisted_checks = { + "check_test": { + "Regions": ["us-east-1", "eu-west-1"], + "Resources": ["*"], } } assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "check_test", @@ -319,7 +317,7 @@ class Test_Allowlist: ) assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "check_test", @@ -329,7 +327,7 @@ class Test_Allowlist: ) assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "check_test", @@ -340,7 +338,7 @@ class Test_Allowlist: assert not ( is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "check_test", @@ -352,21 +350,15 @@ class Test_Allowlist: def test_is_allowlisted_in_check_regex(self): # Allowlist example - allowlist = { - "Accounts": { - AWS_ACCOUNT_NUMBER: { - "Checks": { - "s3_*": { - "Regions": ["us-east-1", "eu-west-1"], - "Resources": ["*"], - } - } - } + allowlisted_checks = { + "s3_*": { + "Regions": ["us-east-1", "eu-west-1"], + "Resources": ["*"], } } assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "s3_bucket_public_access", @@ -376,7 +368,7 @@ class Test_Allowlist: ) assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "s3_bucket_no_mfa_delete", @@ -386,7 +378,7 @@ class Test_Allowlist: ) assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "s3_bucket_policy_public_write_access", @@ -397,7 +389,7 @@ class Test_Allowlist: assert not ( is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "iam_user_hardware_mfa_enabled", @@ -408,21 +400,15 @@ class Test_Allowlist: ) def test_is_allowlisted_lambda_generic_check(self): - allowlist = { - "Accounts": { - AWS_ACCOUNT_NUMBER: { - "Checks": { - "lambda_*": { - "Regions": ["us-east-1", "eu-west-1"], - "Resources": ["*"], - } - } - } + allowlisted_checks = { + "lambda_*": { + "Regions": ["us-east-1", "eu-west-1"], + "Resources": ["*"], } } assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "awslambda_function_invoke_api_operations_cloudtrail_logging_enabled", @@ -432,7 +418,7 @@ class Test_Allowlist: ) assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "awslambda_function_no_secrets_in_code", @@ -442,7 +428,7 @@ class Test_Allowlist: ) assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "awslambda_function_no_secrets_in_variables", @@ -452,7 +438,7 @@ class Test_Allowlist: ) assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "awslambda_function_not_publicly_accessible", @@ -462,7 +448,7 @@ class Test_Allowlist: ) assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "awslambda_function_url_cors_policy", @@ -472,7 +458,7 @@ class Test_Allowlist: ) assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "awslambda_function_url_public", @@ -482,7 +468,7 @@ class Test_Allowlist: ) assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "awslambda_function_using_supported_runtimes", @@ -492,21 +478,15 @@ class Test_Allowlist: ) def test_is_allowlisted_lambda_concrete_check(self): - allowlist = { - "Accounts": { - AWS_ACCOUNT_NUMBER: { - "Checks": { - "lambda_function_no_secrets_in_variables": { - "Regions": ["us-east-1", "eu-west-1"], - "Resources": ["*"], - } - } - } + allowlisted_checks = { + "lambda_function_no_secrets_in_variables": { + "Regions": ["us-east-1", "eu-west-1"], + "Resources": ["*"], } } assert is_allowlisted_in_check( - allowlist, + allowlisted_checks, AWS_ACCOUNT_NUMBER, AWS_ACCOUNT_NUMBER, "awslambda_function_no_secrets_in_variables", @@ -531,7 +511,7 @@ class Test_Allowlist: } } - assert not is_allowlisted( + assert is_allowlisted( allowlist, AWS_ACCOUNT_NUMBER, "check_test", @@ -561,84 +541,68 @@ class Test_Allowlist: ) def test_is_allowlisted_in_tags(self): - # Allowlist example - check_allowlist = { - "Regions": ["us-east-1", "eu-west-1"], - "Resources": ["*"], - "Tags": ["environment=dev", "project=prowler"], - } + allowlist_tags = ["environment=dev", "project=prowler"] + allowlist_resource = "*" - assert not is_allowlisted_in_tags( - check_allowlist, - check_allowlist["Resources"][0], + assert is_allowlisted_in_tags( + allowlist_tags, + "*", "prowler", "environment=dev", ) assert is_allowlisted_in_tags( - check_allowlist, - check_allowlist["Resources"][0], + allowlist_tags, + allowlist_resource, "prowler-test", "environment=dev | project=prowler", ) assert not ( is_allowlisted_in_tags( - check_allowlist, - check_allowlist["Resources"][0], + allowlist_tags, + allowlist_resource, "test", "environment=pro", ) ) def test_is_allowlisted_in_tags_regex(self): - # Allowlist example - check_allowlist = { - "Regions": ["us-east-1", "eu-west-1"], - "Resources": ["*"], - "Tags": ["environment=(dev|test)", ".*=prowler"], - } + allowlist_tags = ["environment=(dev|test)", ".*=prowler"] + allowlist_resource = "*" assert is_allowlisted_in_tags( - check_allowlist, - check_allowlist["Resources"][0], + allowlist_tags, + allowlist_resource, "prowler-test", "environment=test | proj=prowler", ) - assert not is_allowlisted_in_tags( - check_allowlist, - check_allowlist["Resources"][0], + assert is_allowlisted_in_tags( + allowlist_tags, + allowlist_resource, "prowler-test", "env=prod | project=prowler", ) assert not is_allowlisted_in_tags( - check_allowlist, - check_allowlist["Resources"][0], + allowlist_tags, + allowlist_resource, "prowler-test", "environment=prod | project=myproj", ) def test_is_excepted(self): # Allowlist example - check_allowlist = { - "check_test": { - "Regions": ["us-east-1", "eu-west-1"], - "Resources": ["*"], - "Tags": ["environment=dev"], - "Exceptions": { - "Accounts": [AWS_ACCOUNT_NUMBER], - "Regions": ["eu-central-1", "eu-south-3"], - "Resources": ["test"], - "Tags": ["environment=test", "project=.*"], - }, - } + exceptions = { + "Accounts": [AWS_ACCOUNT_NUMBER], + "Regions": ["eu-central-1", "eu-south-3"], + "Resources": ["test"], + "Tags": ["environment=test", "project=.*"], } assert is_excepted( - check_allowlist, - "check_test", + exceptions, AWS_ACCOUNT_NUMBER, "eu-central-1", "test", @@ -646,8 +610,7 @@ class Test_Allowlist: ) assert is_excepted( - check_allowlist, - "check_test", + exceptions, AWS_ACCOUNT_NUMBER, "eu-south-3", "test", @@ -655,8 +618,7 @@ class Test_Allowlist: ) assert is_excepted( - check_allowlist, - "check_test", + exceptions, AWS_ACCOUNT_NUMBER, "eu-south-3", "test123", @@ -664,8 +626,7 @@ class Test_Allowlist: ) assert not is_excepted( - check_allowlist, - "check_test", + exceptions, AWS_ACCOUNT_NUMBER, "eu-south-2", "test", @@ -673,8 +634,7 @@ class Test_Allowlist: ) assert not is_excepted( - check_allowlist, - "check_test", + exceptions, AWS_ACCOUNT_NUMBER, "eu-south-3", "prowler", @@ -682,8 +642,7 @@ class Test_Allowlist: ) assert not is_excepted( - check_allowlist, - "check_test", + exceptions, AWS_ACCOUNT_NUMBER, "eu-south-3", "test",