From efa75a62e30cf8d2486663f38ab5a8fed4fb1464 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Thu, 3 Aug 2023 10:40:51 +0200 Subject: [PATCH] fix(iam_policy_allows_privilege_escalation): Handle permissions in groups (#2655) --- .../iam_policy_allows_privilege_escalation.py | 203 ++++-- ...policy_allows_privilege_escalation_test.py | 617 ++++++++++++++++-- 2 files changed, 713 insertions(+), 107 deletions(-) diff --git a/prowler/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py b/prowler/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py index cf74ec76..15695f9b 100644 --- a/prowler/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py +++ b/prowler/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py @@ -1,3 +1,5 @@ +from re import search + from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.iam.iam_client import iam_client @@ -11,63 +13,94 @@ from prowler.providers.aws.services.iam.iam_client import iam_client # Does the tool handle Condition constraints? --> Not yet. # Does the tool handle service control policy (SCP) restrictions? --> No, SCP are within Organizations AWS API. +# Based on: +# - https://bishopfox.com/blog/privilege-escalation-in-aws +# - https://github.com/RhinoSecurityLabs/Security-Research/blob/master/tools/aws-pentest-tools/aws_escalate.py +# - https://rhinosecuritylabs.com/aws/aws-privilege-escalation-methods-mitigation/ + class iam_policy_allows_privilege_escalation(Check): def execute(self) -> Check_Report_AWS: - # Is necessary to include the "Action:*" for - # each service that has a policy that could - # allow for privilege escalation - privilege_escalation_iam_actions = { - "iam:AttachGroupPolicy", - "iam:SetDefaultPolicyVersion2", - "iam:AddUserToGroup", - "iam:AttachRolePolicy", - "iam:AttachUserPolicy", - "iam:CreateAccessKey", - "iam:CreatePolicyVersion", - "iam:CreateLoginProfile", - "iam:PassRole", - "iam:PutGroupPolicy", - "iam:PutRolePolicy", - "iam:PutUserPolicy", - "iam:SetDefaultPolicyVersion", - "iam:UpdateAssumeRolePolicy", - "iam:UpdateLoginProfile", - "iam:*", - "sts:AssumeRole", - "sts:*", - "ec2:RunInstances", - "ec2:*", - "lambda:CreateEventSourceMapping", - "lambda:CreateFunction", - "lambda:InvokeFunction", - "lambda:UpdateFunctionCode", - "lambda:*", - "dynamodb:CreateTable", - "dynamodb:PutItem", - "dynamodb:*", - "glue:CreateDevEndpoint", - "glue:GetDevEndpoint", - "glue:GetDevEndpoints", - "glue:UpdateDevEndpoint", - "glue:*", - "cloudformation:CreateStack", - "cloudformation:DescribeStacks", - "cloudformation:*", - "datapipeline:CreatePipeline", - "datapipeline:PutPipelineDefinition", - "datapipeline:ActivatePipeline", - "datapipeline:*", + privilege_escalation_policies_combination = { + "CreatePolicyVersion": {"iam:CreatePolicyVersion"}, + "SetDefaultPolicyVersion": {"iam:SetDefaultPolicyVersion"}, + "iam:PassRole": {"iam:PassRole"}, + "PassRole+EC2": { + "iam:PassRole", + "ec2:RunInstances", + }, + "PassRole+CreateLambda+Invoke": { + "iam:PassRole", + "lambda:CreateFunction", + "lambda:InvokeFunction", + }, + "PassRole+CreateLambda+ExistingDynamo": { + "iam:PassRole", + "lambda:CreateFunction", + "lambda:CreateEventSourceMapping", + }, + "PassRole+CreateLambda+NewDynamo": { + "iam:PassRole", + "lambda:CreateFunction", + "lambda:CreateEventSourceMapping", + "dynamodb:CreateTable", + "dynamodb:PutItem", + }, + "PassRole+GlueEndpoint": { + "iam:PassRole", + "glue:CreateDevEndpoint", + "glue:GetDevEndpoint", + }, + "PassRole+GlueEndpoints": { + "iam:PassRole", + "glue:CreateDevEndpoint", + "glue:GetDevEndpoints", + }, + "PassRole+CloudFormation": { + "cloudformation:CreateStack", + "cloudformation:DescribeStacks", + }, + "PassRole+DataPipeline": { + "datapipeline:CreatePipeline", + "datapipeline:PutPipelineDefinition", + "datapipeline:ActivatePipeline", + }, + "GlueUpdateDevEndpoint": {"glue:UpdateDevEndpoint"}, + "GlueUpdateDevEndpoints": {"glue:UpdateDevEndpoint"}, + "lambda:UpdateFunctionCode": {"lambda:UpdateFunctionCode"}, + "iam:CreateAccessKey": {"iam:CreateAccessKey"}, + "iam:CreateLoginProfile": {"iam:CreateLoginProfile"}, + "iam:UpdateLoginProfile": {"iam:UpdateLoginProfile"}, + "iam:AttachUserPolicy": {"iam:AttachUserPolicy"}, + "iam:AttachGroupPolicy": {"iam:AttachGroupPolicy"}, + "iam:AttachRolePolicy": {"iam:AttachRolePolicy"}, + "AssumeRole+AttachRolePolicy": {"sts:AssumeRole", "iam:AttachRolePolicy"}, + "iam:PutGroupPolicy": {"iam:PutGroupPolicy"}, + "iam:PutRolePolicy": {"iam:PutRolePolicy"}, + "AssumeRole+PutRolePolicy": {"sts:AssumeRole", "iam:PutRolePolicy"}, + "iam:PutUserPolicy": {"iam:PutUserPolicy"}, + "iam:AddUserToGroup": {"iam:AddUserToGroup"}, + "iam:UpdateAssumeRolePolicy": {"iam:UpdateAssumeRolePolicy"}, + "AssumeRole+UpdateAssumeRolePolicy": { + "sts:AssumeRole", + "iam:UpdateAssumeRolePolicy", + }, + # TO-DO: We have to handle AssumeRole just if the resource is * and without conditions + # "sts:AssumeRole": {"sts:AssumeRole"}, } + findings = [] + + # Iterate over all the IAM "Customer Managed" policies for policy in iam_client.policies: - # Check only custom policies if policy.type == "Custom": report = Check_Report_AWS(self.metadata()) report.resource_id = policy.name report.resource_arn = policy.arn report.region = iam_client.region report.resource_tags = policy.tags + report.status = "PASS" + report.status_extended = f"Custom Policy {report.resource_arn} does not allow privilege escalation" # List of policy actions allowed_actions = set() @@ -85,42 +118,74 @@ class iam_policy_allows_privilege_escalation(Check): if statements["Effect"] == "Allow": if "Action" in statements: if type(statements["Action"]) is str: - allowed_actions = {statements["Action"]} + allowed_actions.add(statements["Action"]) if type(statements["Action"]) is list: - allowed_actions = set(statements["Action"]) + allowed_actions.update(statements["Action"]) # Recover denied actions if statements["Effect"] == "Deny": if "Action" in statements: if type(statements["Action"]) is str: - denied_actions = {statements["Action"]} + denied_actions.add(statements["Action"]) if type(statements["Action"]) is list: - denied_actions = set(statements["Action"]) + denied_actions.update(statements["Action"]) if "NotAction" in statements: if type(statements["NotAction"]) is str: - denied_not_actions = {statements["NotAction"]} + denied_not_actions.add(statements["NotAction"]) if type(statements["NotAction"]) is list: - denied_not_actions = set(statements["NotAction"]) + denied_not_actions.update(statements["NotAction"]) - # First, we need to perform a left join with ALLOWED_ACTIONS and DENIED_ACTIONS - left_actions = allowed_actions.difference(denied_actions) - # Then, we need to find the DENIED_NOT_ACTIONS in LEFT_ACTIONS - if denied_not_actions: - privileged_actions = left_actions.intersection(denied_not_actions) - # If there is no Denied Not Actions - else: - privileged_actions = left_actions - # Finally, check if there is a privilege escalation action within this policy - policy_privilege_escalation_actions = privileged_actions.intersection( - privilege_escalation_iam_actions - ) + # First, we need to perform a left join with ALLOWED_ACTIONS and DENIED_ACTIONS + left_actions = allowed_actions.difference(denied_actions) + # Then, we need to find the DENIED_NOT_ACTIONS in LEFT_ACTIONS + if denied_not_actions: + privileged_actions = left_actions.intersection( + denied_not_actions + ) + # If there is no Denied Not Actions + else: + privileged_actions = left_actions - if len(policy_privilege_escalation_actions) == 0: - report.status = "PASS" - report.status_extended = f"Custom Policy {report.resource_arn} does not allow privilege escalation" - else: - report.status = "FAIL" - report.status_extended = f"Custom Policy {report.resource_arn} allows privilege escalation using the following actions: {policy_privilege_escalation_actions}" + # Store all the action's combinations + policies_combination = set() + + for values in privilege_escalation_policies_combination.values(): + for val in values: + val_set = set() + val_set.add(val) + # Look for specific api:action + if privileged_actions.intersection(val_set) == val_set: + policies_combination.add(val) + # Look for api:* + else: + for permission in privileged_actions: + api = permission.split(":")[0] + api_action = permission.split(":")[1] + + if api_action == "*": + if search(api, val): + policies_combination.add(val) + + # Check all policies combinations and see if matchs with some combo key + combos = set() + for ( + key, + values, + ) in privilege_escalation_policies_combination.items(): + intersection = policies_combination.intersection(values) + if intersection == values: + combos.add(key) + + if len(combos) != 0: + report.status = "FAIL" + policies_affected = "" + for key in combos: + policies_affected += ( + str(privilege_escalation_policies_combination[key]) + + " " + ) + + report.status_extended = f"Custom Policy {report.resource_arn} allows privilege escalation using the following actions: {policies_affected}".rstrip() findings.append(report) return findings diff --git a/tests/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation_test.py b/tests/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation_test.py index 14600848..89b34f43 100644 --- a/tests/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation_test.py +++ b/tests/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation_test.py @@ -1,4 +1,5 @@ from json import dumps +from re import search from unittest import mock from boto3 import client, session @@ -10,6 +11,75 @@ from prowler.providers.common.models import Audit_Metadata AWS_REGION = "us-east-1" AWS_ACCOUNT_NUMBER = "123456789012" +# Keep this up-to-date with the check's actions that allows for privilege escalation +privilege_escalation_policies_combination = { + "CreatePolicyVersion": {"iam:CreatePolicyVersion"}, + "SetDefaultPolicyVersion": {"iam:SetDefaultPolicyVersion"}, + "iam:PassRole": {"iam:PassRole"}, + "PassRole+EC2": { + "iam:PassRole", + "ec2:RunInstances", + }, + "PassRole+CreateLambda+Invoke": { + "iam:PassRole", + "lambda:CreateFunction", + "lambda:InvokeFunction", + }, + "PassRole+CreateLambda+ExistingDynamo": { + "iam:PassRole", + "lambda:CreateFunction", + "lambda:CreateEventSourceMapping", + }, + "PassRole+CreateLambda+NewDynamo": { + "iam:PassRole", + "lambda:CreateFunction", + "lambda:CreateEventSourceMapping", + "dynamodb:CreateTable", + "dynamodb:PutItem", + }, + "PassRole+GlueEndpoint": { + "iam:PassRole", + "glue:CreateDevEndpoint", + "glue:GetDevEndpoint", + }, + "PassRole+GlueEndpoints": { + "iam:PassRole", + "glue:CreateDevEndpoint", + "glue:GetDevEndpoints", + }, + "PassRole+CloudFormation": { + "cloudformation:CreateStack", + "cloudformation:DescribeStacks", + }, + "PassRole+DataPipeline": { + "datapipeline:CreatePipeline", + "datapipeline:PutPipelineDefinition", + "datapipeline:ActivatePipeline", + }, + "GlueUpdateDevEndpoint": {"glue:UpdateDevEndpoint"}, + "GlueUpdateDevEndpoints": {"glue:UpdateDevEndpoint"}, + "lambda:UpdateFunctionCode": {"lambda:UpdateFunctionCode"}, + "iam:CreateAccessKey": {"iam:CreateAccessKey"}, + "iam:CreateLoginProfile": {"iam:CreateLoginProfile"}, + "iam:UpdateLoginProfile": {"iam:UpdateLoginProfile"}, + "iam:AttachUserPolicy": {"iam:AttachUserPolicy"}, + "iam:AttachGroupPolicy": {"iam:AttachGroupPolicy"}, + "iam:AttachRolePolicy": {"iam:AttachRolePolicy"}, + "AssumeRole+AttachRolePolicy": {"sts:AssumeRole", "iam:AttachRolePolicy"}, + "iam:PutGroupPolicy": {"iam:PutGroupPolicy"}, + "iam:PutRolePolicy": {"iam:PutRolePolicy"}, + "AssumeRole+PutRolePolicy": {"sts:AssumeRole", "iam:PutRolePolicy"}, + "iam:PutUserPolicy": {"iam:PutUserPolicy"}, + "iam:AddUserToGroup": {"iam:AddUserToGroup"}, + "iam:UpdateAssumeRolePolicy": {"iam:UpdateAssumeRolePolicy"}, + "AssumeRole+UpdateAssumeRolePolicy": { + "sts:AssumeRole", + "iam:UpdateAssumeRolePolicy", + }, + # TO-DO: We have to handle AssumeRole just if the resource is * and without conditions + # "sts:AssumeRole": {"sts:AssumeRole"}, +} + class Test_iam_policy_allows_privilege_escalation: def set_mocked_audit_info(self): @@ -43,45 +113,45 @@ class Test_iam_policy_allows_privilege_escalation: return audit_info - @mock_iam - def test_iam_policy_allows_privilege_escalation_sts(self): - iam_client = client("iam", region_name=AWS_REGION) - policy_name = "policy1" - policy_document = { - "Version": "2012-10-17", - "Statement": [ - {"Effect": "Allow", "Action": "sts:*", "Resource": "*"}, - ], - } - policy_arn = iam_client.create_policy( - PolicyName=policy_name, PolicyDocument=dumps(policy_document) - )["Policy"]["Arn"] + # @mock_iam + # def test_iam_policy_allows_privilege_escalation_sts(self): + # iam_client = client("iam", region_name=AWS_REGION) + # policy_name = "policy1" + # policy_document = { + # "Version": "2012-10-17", + # "Statement": [ + # {"Effect": "Allow", "Action": "sts:*", "Resource": "*"}, + # ], + # } + # policy_arn = iam_client.create_policy( + # PolicyName=policy_name, PolicyDocument=dumps(policy_document) + # )["Policy"]["Arn"] - current_audit_info = self.set_mocked_audit_info() - from prowler.providers.aws.services.iam.iam_service import IAM + # current_audit_info = self.set_mocked_audit_info() + # from prowler.providers.aws.services.iam.iam_service import IAM - with mock.patch( - "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", - new=current_audit_info, - ), mock.patch( - "prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client", - new=IAM(current_audit_info), - ): - # Test Check - from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import ( - iam_policy_allows_privilege_escalation, - ) + # with mock.patch( + # "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + # new=current_audit_info, + # ), mock.patch( + # "prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client", + # new=IAM(current_audit_info), + # ): + # # Test Check + # from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import ( + # iam_policy_allows_privilege_escalation, + # ) - check = iam_policy_allows_privilege_escalation() - result = check.execute() - assert len(result) == 1 - assert result[0].status == "FAIL" - assert ( - result[0].status_extended - == f"Custom Policy {policy_arn} allows privilege escalation using the following actions: {{'sts:*'}}" - ) - assert result[0].resource_id == policy_name - assert result[0].resource_arn == policy_arn + # check = iam_policy_allows_privilege_escalation() + # result = check.execute() + # assert len(result) == 1 + # assert result[0].status == "FAIL" + # assert ( + # result[0].status_extended + # == f"Custom Policy {policy_arn} allows privilege escalation using the following actions: {{'sts:AssumeRole'}}" + # ) + # assert result[0].resource_id == policy_name + # assert result[0].resource_arn == policy_arn @mock_iam def test_iam_policy_not_allows_privilege_escalation(self): @@ -220,10 +290,481 @@ class Test_iam_policy_allows_privilege_escalation: check = iam_policy_allows_privilege_escalation() result = check.execute() assert len(result) == 1 - assert result[0].status == "FAIL" + assert result[0].status == "PASS" assert ( result[0].status_extended - == f"Custom Policy {policy_arn} allows privilege escalation using the following actions: {{'dynamodb:PutItem'}}" + == f"Custom Policy {policy_arn} does not allow privilege escalation" ) assert result[0].resource_id == policy_name assert result[0].resource_arn == policy_arn + + @mock_iam + def test_iam_policy_allows_privilege_escalation_iam_all_and_ec2_RunInstances( + self, + ): + iam_client = client("iam", region_name=AWS_REGION) + policy_name = "policy1" + policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "iam:*", + ], + "Resource": "*", + }, + { + "Effect": "Allow", + "Action": ["ec2:RunInstances"], + "Resource": "*", + }, + ], + } + policy_arn = iam_client.create_policy( + PolicyName=policy_name, PolicyDocument=dumps(policy_document) + )["Policy"]["Arn"] + + current_audit_info = self.set_mocked_audit_info() + from prowler.providers.aws.services.iam.iam_service import IAM + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client", + new=IAM(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import ( + iam_policy_allows_privilege_escalation, + ) + + check = iam_policy_allows_privilege_escalation() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == policy_name + assert result[0].resource_arn == policy_arn + + assert search( + f"Custom Policy {policy_arn} allows privilege escalation using the following actions: ", + result[0].status_extended, + ) + assert search("iam:PassRole", result[0].status_extended) + assert search("ec2:RunInstances", result[0].status_extended) + + @mock_iam + def test_iam_policy_allows_privilege_escalation_iam_PassRole( + self, + ): + iam_client = client("iam", region_name=AWS_REGION) + policy_name = "policy1" + policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": "iam:PassRole", + "Resource": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/ecs", + } + ], + } + policy_arn = iam_client.create_policy( + PolicyName=policy_name, PolicyDocument=dumps(policy_document) + )["Policy"]["Arn"] + + current_audit_info = self.set_mocked_audit_info() + from prowler.providers.aws.services.iam.iam_service import IAM + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client", + new=IAM(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import ( + iam_policy_allows_privilege_escalation, + ) + + check = iam_policy_allows_privilege_escalation() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == policy_name + assert result[0].resource_arn == policy_arn + + assert search( + f"Custom Policy {policy_arn} allows privilege escalation using the following actions: ", + result[0].status_extended, + ) + assert search("iam:PassRole", result[0].status_extended) + + @mock_iam + def test_iam_policy_allows_privilege_escalation_two_combinations( + self, + ): + iam_client = client("iam", region_name=AWS_REGION) + policy_name = "policy1" + policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "iam:PassRole", + ], + "Resource": "*", + }, + { + "Effect": "Allow", + "Action": ["ec2:RunInstances"], + "Resource": "*", + }, + { + "Effect": "Allow", + "Action": [ + "lambda:CreateFunction", + ], + "Resource": "*", + }, + { + "Effect": "Allow", + "Action": ["lambda:InvokeFunction"], + "Resource": "*", + }, + ], + } + policy_arn = iam_client.create_policy( + PolicyName=policy_name, PolicyDocument=dumps(policy_document) + )["Policy"]["Arn"] + + current_audit_info = self.set_mocked_audit_info() + from prowler.providers.aws.services.iam.iam_service import IAM + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client", + new=IAM(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import ( + iam_policy_allows_privilege_escalation, + ) + + check = iam_policy_allows_privilege_escalation() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == policy_name + assert result[0].resource_arn == policy_arn + + assert search( + f"Custom Policy {policy_arn} allows privilege escalation using the following actions: ", + result[0].status_extended, + ) + assert search("iam:PassRole", result[0].status_extended) + assert search("lambda:InvokeFunction", result[0].status_extended) + assert search("lambda:CreateFunction", result[0].status_extended) + assert search("ec2:RunInstances", result[0].status_extended) + + @mock_iam + def test_iam_policy_allows_privilege_escalation_iam_PassRole_and_other_actions( + self, + ): + iam_client = client("iam", region_name=AWS_REGION) + policy_name = "policy1" + policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": "iam:PassRole", + "Resource": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/ecs", + }, + { + "Action": "account:GetAccountInformation", + "Effect": "Allow", + "Resource": "*", + }, + ], + } + policy_arn = iam_client.create_policy( + PolicyName=policy_name, PolicyDocument=dumps(policy_document) + )["Policy"]["Arn"] + + current_audit_info = self.set_mocked_audit_info() + from prowler.providers.aws.services.iam.iam_service import IAM + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client", + new=IAM(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import ( + iam_policy_allows_privilege_escalation, + ) + + check = iam_policy_allows_privilege_escalation() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == policy_name + assert result[0].resource_arn == policy_arn + + assert search( + f"Custom Policy {policy_arn} allows privilege escalation using the following actions: ", + result[0].status_extended, + ) + assert search("iam:PassRole", result[0].status_extended) + + @mock_iam + def test_iam_policy_allows_privilege_escalation_policies_combination( + self, + ): + current_audit_info = self.set_mocked_audit_info() + iam_client = client("iam", region_name=AWS_REGION) + policy_name = "privileged_policy" + for values in privilege_escalation_policies_combination.values(): + print(list(values)) + # We create a new statement in each loop with the combinations required to allow the privilege escalation + policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": list(values), + "Resource": "*", + }, + ], + } + policy_arn = iam_client.create_policy( + PolicyName=policy_name, PolicyDocument=dumps(policy_document) + )["Policy"]["Arn"] + + from prowler.providers.aws.services.iam.iam_service import IAM + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client", + new=IAM(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import ( + iam_policy_allows_privilege_escalation, + ) + + check = iam_policy_allows_privilege_escalation() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == policy_name + assert result[0].resource_arn == policy_arn + + assert search( + f"Custom Policy {policy_arn} allows privilege escalation using the following actions: ", + result[0].status_extended, + ) + + # Check the actions that allow for privilege escalation + for action in values: + assert search(action, result[0].status_extended) + + # Delete each IAM policy after the test + iam_client.delete_policy(PolicyArn=policy_arn) + + @mock_iam + def test_iam_policy_allows_privilege_escalation_two_policies_one_good_one_bad( + self, + ): + current_audit_info = self.set_mocked_audit_info() + iam_client = client("iam", region_name=AWS_REGION) + policy_name_1 = "privileged_policy_1" + policy_document_1 = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": ["ec2:RunInstances"], + "Resource": "*", + }, + ], + } + policy_name_2 = "privileged_policy_2" + policy_document_2 = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "iam:PassRole", + ], + "Resource": "*", + }, + { + "Effect": "Allow", + "Action": [ + "lambda:CreateFunction", + ], + "Resource": "*", + }, + { + "Effect": "Allow", + "Action": ["lambda:InvokeFunction"], + "Resource": "*", + }, + ], + } + policy_arn_1 = iam_client.create_policy( + PolicyName=policy_name_1, PolicyDocument=dumps(policy_document_1) + )["Policy"]["Arn"] + + policy_arn_2 = iam_client.create_policy( + PolicyName=policy_name_2, PolicyDocument=dumps(policy_document_2) + )["Policy"]["Arn"] + + from prowler.providers.aws.services.iam.iam_service import IAM + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client", + new=IAM(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import ( + iam_policy_allows_privilege_escalation, + ) + + check = iam_policy_allows_privilege_escalation() + result = check.execute() + assert len(result) == 2 + for finding in result: + if finding.resource_id == policy_name_1: + assert finding.status == "PASS" + assert finding.resource_arn == policy_arn_1 + assert ( + finding.status_extended + == f"Custom Policy {policy_arn_1} does not allow privilege escalation" + ) + + if finding.resource_id == policy_name_2: + assert finding.status == "FAIL" + assert finding.resource_arn == policy_arn_2 + + assert search( + f"Custom Policy {policy_arn_2} allows privilege escalation using the following actions: ", + finding.status_extended, + ) + assert search("iam:PassRole", finding.status_extended) + assert search("lambda:InvokeFunction", finding.status_extended) + assert search("lambda:CreateFunction", finding.status_extended) + + @mock_iam + def test_iam_policy_allows_privilege_escalation_two_bad_policies( + self, + ): + current_audit_info = self.set_mocked_audit_info() + iam_client = client("iam", region_name=AWS_REGION) + policy_name_1 = "privileged_policy_1" + policy_document_1 = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "iam:PassRole", + ], + "Resource": "*", + }, + { + "Effect": "Allow", + "Action": ["ec2:RunInstances"], + "Resource": "*", + }, + ], + } + policy_name_2 = "privileged_policy_2" + policy_document_2 = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "iam:PassRole", + ], + "Resource": "*", + }, + { + "Effect": "Allow", + "Action": [ + "lambda:CreateFunction", + ], + "Resource": "*", + }, + { + "Effect": "Allow", + "Action": ["lambda:InvokeFunction"], + "Resource": "*", + }, + ], + } + policy_arn_1 = iam_client.create_policy( + PolicyName=policy_name_1, PolicyDocument=dumps(policy_document_1) + )["Policy"]["Arn"] + + policy_arn_2 = iam_client.create_policy( + PolicyName=policy_name_2, PolicyDocument=dumps(policy_document_2) + )["Policy"]["Arn"] + + from prowler.providers.aws.services.iam.iam_service import IAM + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client", + new=IAM(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import ( + iam_policy_allows_privilege_escalation, + ) + + check = iam_policy_allows_privilege_escalation() + result = check.execute() + assert len(result) == 2 + for finding in result: + if finding.resource_id == policy_name_1: + assert finding.status == "FAIL" + assert finding.resource_arn == policy_arn_1 + + assert search( + f"Custom Policy {policy_arn_1} allows privilege escalation using the following actions: ", + finding.status_extended, + ) + + assert search("iam:PassRole", finding.status_extended) + assert search("ec2:RunInstances", finding.status_extended) + + if finding.resource_id == policy_name_2: + assert finding.status == "FAIL" + assert finding.resource_arn == policy_arn_2 + + assert search( + f"Custom Policy {policy_arn_2} allows privilege escalation using the following actions: ", + finding.status_extended, + ) + assert search("iam:PassRole", finding.status_extended) + assert search("lambda:InvokeFunction", finding.status_extended) + assert search("lambda:CreateFunction", finding.status_extended)