fix(iam_policy_allows_privilege_escalation): Handle permissions in groups (#2655)

This commit is contained in:
Pepe Fagoaga
2023-08-03 10:40:51 +02:00
committed by GitHub
parent 5763bca317
commit efa75a62e3
2 changed files with 713 additions and 107 deletions

View File

@@ -1,3 +1,5 @@
from re import search
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.iam.iam_client import iam_client
@@ -11,63 +13,94 @@ from prowler.providers.aws.services.iam.iam_client import iam_client
# Does the tool handle Condition constraints? --> Not yet.
# Does the tool handle service control policy (SCP) restrictions? --> No, SCP are within Organizations AWS API.
# Based on:
# - https://bishopfox.com/blog/privilege-escalation-in-aws
# - https://github.com/RhinoSecurityLabs/Security-Research/blob/master/tools/aws-pentest-tools/aws_escalate.py
# - https://rhinosecuritylabs.com/aws/aws-privilege-escalation-methods-mitigation/
class iam_policy_allows_privilege_escalation(Check):
def execute(self) -> Check_Report_AWS:
# Is necessary to include the "Action:*" for
# each service that has a policy that could
# allow for privilege escalation
privilege_escalation_iam_actions = {
"iam:AttachGroupPolicy",
"iam:SetDefaultPolicyVersion2",
"iam:AddUserToGroup",
"iam:AttachRolePolicy",
"iam:AttachUserPolicy",
"iam:CreateAccessKey",
"iam:CreatePolicyVersion",
"iam:CreateLoginProfile",
"iam:PassRole",
"iam:PutGroupPolicy",
"iam:PutRolePolicy",
"iam:PutUserPolicy",
"iam:SetDefaultPolicyVersion",
"iam:UpdateAssumeRolePolicy",
"iam:UpdateLoginProfile",
"iam:*",
"sts:AssumeRole",
"sts:*",
"ec2:RunInstances",
"ec2:*",
"lambda:CreateEventSourceMapping",
"lambda:CreateFunction",
"lambda:InvokeFunction",
"lambda:UpdateFunctionCode",
"lambda:*",
"dynamodb:CreateTable",
"dynamodb:PutItem",
"dynamodb:*",
"glue:CreateDevEndpoint",
"glue:GetDevEndpoint",
"glue:GetDevEndpoints",
"glue:UpdateDevEndpoint",
"glue:*",
"cloudformation:CreateStack",
"cloudformation:DescribeStacks",
"cloudformation:*",
"datapipeline:CreatePipeline",
"datapipeline:PutPipelineDefinition",
"datapipeline:ActivatePipeline",
"datapipeline:*",
privilege_escalation_policies_combination = {
"CreatePolicyVersion": {"iam:CreatePolicyVersion"},
"SetDefaultPolicyVersion": {"iam:SetDefaultPolicyVersion"},
"iam:PassRole": {"iam:PassRole"},
"PassRole+EC2": {
"iam:PassRole",
"ec2:RunInstances",
},
"PassRole+CreateLambda+Invoke": {
"iam:PassRole",
"lambda:CreateFunction",
"lambda:InvokeFunction",
},
"PassRole+CreateLambda+ExistingDynamo": {
"iam:PassRole",
"lambda:CreateFunction",
"lambda:CreateEventSourceMapping",
},
"PassRole+CreateLambda+NewDynamo": {
"iam:PassRole",
"lambda:CreateFunction",
"lambda:CreateEventSourceMapping",
"dynamodb:CreateTable",
"dynamodb:PutItem",
},
"PassRole+GlueEndpoint": {
"iam:PassRole",
"glue:CreateDevEndpoint",
"glue:GetDevEndpoint",
},
"PassRole+GlueEndpoints": {
"iam:PassRole",
"glue:CreateDevEndpoint",
"glue:GetDevEndpoints",
},
"PassRole+CloudFormation": {
"cloudformation:CreateStack",
"cloudformation:DescribeStacks",
},
"PassRole+DataPipeline": {
"datapipeline:CreatePipeline",
"datapipeline:PutPipelineDefinition",
"datapipeline:ActivatePipeline",
},
"GlueUpdateDevEndpoint": {"glue:UpdateDevEndpoint"},
"GlueUpdateDevEndpoints": {"glue:UpdateDevEndpoint"},
"lambda:UpdateFunctionCode": {"lambda:UpdateFunctionCode"},
"iam:CreateAccessKey": {"iam:CreateAccessKey"},
"iam:CreateLoginProfile": {"iam:CreateLoginProfile"},
"iam:UpdateLoginProfile": {"iam:UpdateLoginProfile"},
"iam:AttachUserPolicy": {"iam:AttachUserPolicy"},
"iam:AttachGroupPolicy": {"iam:AttachGroupPolicy"},
"iam:AttachRolePolicy": {"iam:AttachRolePolicy"},
"AssumeRole+AttachRolePolicy": {"sts:AssumeRole", "iam:AttachRolePolicy"},
"iam:PutGroupPolicy": {"iam:PutGroupPolicy"},
"iam:PutRolePolicy": {"iam:PutRolePolicy"},
"AssumeRole+PutRolePolicy": {"sts:AssumeRole", "iam:PutRolePolicy"},
"iam:PutUserPolicy": {"iam:PutUserPolicy"},
"iam:AddUserToGroup": {"iam:AddUserToGroup"},
"iam:UpdateAssumeRolePolicy": {"iam:UpdateAssumeRolePolicy"},
"AssumeRole+UpdateAssumeRolePolicy": {
"sts:AssumeRole",
"iam:UpdateAssumeRolePolicy",
},
# TO-DO: We have to handle AssumeRole just if the resource is * and without conditions
# "sts:AssumeRole": {"sts:AssumeRole"},
}
findings = []
# Iterate over all the IAM "Customer Managed" policies
for policy in iam_client.policies:
# Check only custom policies
if policy.type == "Custom":
report = Check_Report_AWS(self.metadata())
report.resource_id = policy.name
report.resource_arn = policy.arn
report.region = iam_client.region
report.resource_tags = policy.tags
report.status = "PASS"
report.status_extended = f"Custom Policy {report.resource_arn} does not allow privilege escalation"
# List of policy actions
allowed_actions = set()
@@ -85,42 +118,74 @@ class iam_policy_allows_privilege_escalation(Check):
if statements["Effect"] == "Allow":
if "Action" in statements:
if type(statements["Action"]) is str:
allowed_actions = {statements["Action"]}
allowed_actions.add(statements["Action"])
if type(statements["Action"]) is list:
allowed_actions = set(statements["Action"])
allowed_actions.update(statements["Action"])
# Recover denied actions
if statements["Effect"] == "Deny":
if "Action" in statements:
if type(statements["Action"]) is str:
denied_actions = {statements["Action"]}
denied_actions.add(statements["Action"])
if type(statements["Action"]) is list:
denied_actions = set(statements["Action"])
denied_actions.update(statements["Action"])
if "NotAction" in statements:
if type(statements["NotAction"]) is str:
denied_not_actions = {statements["NotAction"]}
denied_not_actions.add(statements["NotAction"])
if type(statements["NotAction"]) is list:
denied_not_actions = set(statements["NotAction"])
denied_not_actions.update(statements["NotAction"])
# First, we need to perform a left join with ALLOWED_ACTIONS and DENIED_ACTIONS
left_actions = allowed_actions.difference(denied_actions)
# Then, we need to find the DENIED_NOT_ACTIONS in LEFT_ACTIONS
if denied_not_actions:
privileged_actions = left_actions.intersection(denied_not_actions)
# If there is no Denied Not Actions
else:
privileged_actions = left_actions
# Finally, check if there is a privilege escalation action within this policy
policy_privilege_escalation_actions = privileged_actions.intersection(
privilege_escalation_iam_actions
)
# First, we need to perform a left join with ALLOWED_ACTIONS and DENIED_ACTIONS
left_actions = allowed_actions.difference(denied_actions)
# Then, we need to find the DENIED_NOT_ACTIONS in LEFT_ACTIONS
if denied_not_actions:
privileged_actions = left_actions.intersection(
denied_not_actions
)
# If there is no Denied Not Actions
else:
privileged_actions = left_actions
if len(policy_privilege_escalation_actions) == 0:
report.status = "PASS"
report.status_extended = f"Custom Policy {report.resource_arn} does not allow privilege escalation"
else:
report.status = "FAIL"
report.status_extended = f"Custom Policy {report.resource_arn} allows privilege escalation using the following actions: {policy_privilege_escalation_actions}"
# Store all the action's combinations
policies_combination = set()
for values in privilege_escalation_policies_combination.values():
for val in values:
val_set = set()
val_set.add(val)
# Look for specific api:action
if privileged_actions.intersection(val_set) == val_set:
policies_combination.add(val)
# Look for api:*
else:
for permission in privileged_actions:
api = permission.split(":")[0]
api_action = permission.split(":")[1]
if api_action == "*":
if search(api, val):
policies_combination.add(val)
# Check all policies combinations and see if matchs with some combo key
combos = set()
for (
key,
values,
) in privilege_escalation_policies_combination.items():
intersection = policies_combination.intersection(values)
if intersection == values:
combos.add(key)
if len(combos) != 0:
report.status = "FAIL"
policies_affected = ""
for key in combos:
policies_affected += (
str(privilege_escalation_policies_combination[key])
+ " "
)
report.status_extended = f"Custom Policy {report.resource_arn} allows privilege escalation using the following actions: {policies_affected}".rstrip()
findings.append(report)
return findings

View File

@@ -1,4 +1,5 @@
from json import dumps
from re import search
from unittest import mock
from boto3 import client, session
@@ -10,6 +11,75 @@ from prowler.providers.common.models import Audit_Metadata
AWS_REGION = "us-east-1"
AWS_ACCOUNT_NUMBER = "123456789012"
# Keep this up-to-date with the check's actions that allows for privilege escalation
privilege_escalation_policies_combination = {
"CreatePolicyVersion": {"iam:CreatePolicyVersion"},
"SetDefaultPolicyVersion": {"iam:SetDefaultPolicyVersion"},
"iam:PassRole": {"iam:PassRole"},
"PassRole+EC2": {
"iam:PassRole",
"ec2:RunInstances",
},
"PassRole+CreateLambda+Invoke": {
"iam:PassRole",
"lambda:CreateFunction",
"lambda:InvokeFunction",
},
"PassRole+CreateLambda+ExistingDynamo": {
"iam:PassRole",
"lambda:CreateFunction",
"lambda:CreateEventSourceMapping",
},
"PassRole+CreateLambda+NewDynamo": {
"iam:PassRole",
"lambda:CreateFunction",
"lambda:CreateEventSourceMapping",
"dynamodb:CreateTable",
"dynamodb:PutItem",
},
"PassRole+GlueEndpoint": {
"iam:PassRole",
"glue:CreateDevEndpoint",
"glue:GetDevEndpoint",
},
"PassRole+GlueEndpoints": {
"iam:PassRole",
"glue:CreateDevEndpoint",
"glue:GetDevEndpoints",
},
"PassRole+CloudFormation": {
"cloudformation:CreateStack",
"cloudformation:DescribeStacks",
},
"PassRole+DataPipeline": {
"datapipeline:CreatePipeline",
"datapipeline:PutPipelineDefinition",
"datapipeline:ActivatePipeline",
},
"GlueUpdateDevEndpoint": {"glue:UpdateDevEndpoint"},
"GlueUpdateDevEndpoints": {"glue:UpdateDevEndpoint"},
"lambda:UpdateFunctionCode": {"lambda:UpdateFunctionCode"},
"iam:CreateAccessKey": {"iam:CreateAccessKey"},
"iam:CreateLoginProfile": {"iam:CreateLoginProfile"},
"iam:UpdateLoginProfile": {"iam:UpdateLoginProfile"},
"iam:AttachUserPolicy": {"iam:AttachUserPolicy"},
"iam:AttachGroupPolicy": {"iam:AttachGroupPolicy"},
"iam:AttachRolePolicy": {"iam:AttachRolePolicy"},
"AssumeRole+AttachRolePolicy": {"sts:AssumeRole", "iam:AttachRolePolicy"},
"iam:PutGroupPolicy": {"iam:PutGroupPolicy"},
"iam:PutRolePolicy": {"iam:PutRolePolicy"},
"AssumeRole+PutRolePolicy": {"sts:AssumeRole", "iam:PutRolePolicy"},
"iam:PutUserPolicy": {"iam:PutUserPolicy"},
"iam:AddUserToGroup": {"iam:AddUserToGroup"},
"iam:UpdateAssumeRolePolicy": {"iam:UpdateAssumeRolePolicy"},
"AssumeRole+UpdateAssumeRolePolicy": {
"sts:AssumeRole",
"iam:UpdateAssumeRolePolicy",
},
# TO-DO: We have to handle AssumeRole just if the resource is * and without conditions
# "sts:AssumeRole": {"sts:AssumeRole"},
}
class Test_iam_policy_allows_privilege_escalation:
def set_mocked_audit_info(self):
@@ -43,45 +113,45 @@ class Test_iam_policy_allows_privilege_escalation:
return audit_info
@mock_iam
def test_iam_policy_allows_privilege_escalation_sts(self):
iam_client = client("iam", region_name=AWS_REGION)
policy_name = "policy1"
policy_document = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": "sts:*", "Resource": "*"},
],
}
policy_arn = iam_client.create_policy(
PolicyName=policy_name, PolicyDocument=dumps(policy_document)
)["Policy"]["Arn"]
# @mock_iam
# def test_iam_policy_allows_privilege_escalation_sts(self):
# iam_client = client("iam", region_name=AWS_REGION)
# policy_name = "policy1"
# policy_document = {
# "Version": "2012-10-17",
# "Statement": [
# {"Effect": "Allow", "Action": "sts:*", "Resource": "*"},
# ],
# }
# policy_arn = iam_client.create_policy(
# PolicyName=policy_name, PolicyDocument=dumps(policy_document)
# )["Policy"]["Arn"]
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.aws.services.iam.iam_service import IAM
# current_audit_info = self.set_mocked_audit_info()
# from prowler.providers.aws.services.iam.iam_service import IAM
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client",
new=IAM(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import (
iam_policy_allows_privilege_escalation,
)
# with mock.patch(
# "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
# new=current_audit_info,
# ), mock.patch(
# "prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client",
# new=IAM(current_audit_info),
# ):
# # Test Check
# from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import (
# iam_policy_allows_privilege_escalation,
# )
check = iam_policy_allows_privilege_escalation()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Custom Policy {policy_arn} allows privilege escalation using the following actions: {{'sts:*'}}"
)
assert result[0].resource_id == policy_name
assert result[0].resource_arn == policy_arn
# check = iam_policy_allows_privilege_escalation()
# result = check.execute()
# assert len(result) == 1
# assert result[0].status == "FAIL"
# assert (
# result[0].status_extended
# == f"Custom Policy {policy_arn} allows privilege escalation using the following actions: {{'sts:AssumeRole'}}"
# )
# assert result[0].resource_id == policy_name
# assert result[0].resource_arn == policy_arn
@mock_iam
def test_iam_policy_not_allows_privilege_escalation(self):
@@ -220,10 +290,481 @@ class Test_iam_policy_allows_privilege_escalation:
check = iam_policy_allows_privilege_escalation()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Custom Policy {policy_arn} allows privilege escalation using the following actions: {{'dynamodb:PutItem'}}"
== f"Custom Policy {policy_arn} does not allow privilege escalation"
)
assert result[0].resource_id == policy_name
assert result[0].resource_arn == policy_arn
@mock_iam
def test_iam_policy_allows_privilege_escalation_iam_all_and_ec2_RunInstances(
self,
):
iam_client = client("iam", region_name=AWS_REGION)
policy_name = "policy1"
policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:*",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": ["ec2:RunInstances"],
"Resource": "*",
},
],
}
policy_arn = iam_client.create_policy(
PolicyName=policy_name, PolicyDocument=dumps(policy_document)
)["Policy"]["Arn"]
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.aws.services.iam.iam_service import IAM
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client",
new=IAM(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import (
iam_policy_allows_privilege_escalation,
)
check = iam_policy_allows_privilege_escalation()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == policy_name
assert result[0].resource_arn == policy_arn
assert search(
f"Custom Policy {policy_arn} allows privilege escalation using the following actions: ",
result[0].status_extended,
)
assert search("iam:PassRole", result[0].status_extended)
assert search("ec2:RunInstances", result[0].status_extended)
@mock_iam
def test_iam_policy_allows_privilege_escalation_iam_PassRole(
self,
):
iam_client = client("iam", region_name=AWS_REGION)
policy_name = "policy1"
policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/ecs",
}
],
}
policy_arn = iam_client.create_policy(
PolicyName=policy_name, PolicyDocument=dumps(policy_document)
)["Policy"]["Arn"]
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.aws.services.iam.iam_service import IAM
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client",
new=IAM(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import (
iam_policy_allows_privilege_escalation,
)
check = iam_policy_allows_privilege_escalation()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == policy_name
assert result[0].resource_arn == policy_arn
assert search(
f"Custom Policy {policy_arn} allows privilege escalation using the following actions: ",
result[0].status_extended,
)
assert search("iam:PassRole", result[0].status_extended)
@mock_iam
def test_iam_policy_allows_privilege_escalation_two_combinations(
self,
):
iam_client = client("iam", region_name=AWS_REGION)
policy_name = "policy1"
policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:PassRole",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": ["ec2:RunInstances"],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": [
"lambda:CreateFunction",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": ["lambda:InvokeFunction"],
"Resource": "*",
},
],
}
policy_arn = iam_client.create_policy(
PolicyName=policy_name, PolicyDocument=dumps(policy_document)
)["Policy"]["Arn"]
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.aws.services.iam.iam_service import IAM
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client",
new=IAM(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import (
iam_policy_allows_privilege_escalation,
)
check = iam_policy_allows_privilege_escalation()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == policy_name
assert result[0].resource_arn == policy_arn
assert search(
f"Custom Policy {policy_arn} allows privilege escalation using the following actions: ",
result[0].status_extended,
)
assert search("iam:PassRole", result[0].status_extended)
assert search("lambda:InvokeFunction", result[0].status_extended)
assert search("lambda:CreateFunction", result[0].status_extended)
assert search("ec2:RunInstances", result[0].status_extended)
@mock_iam
def test_iam_policy_allows_privilege_escalation_iam_PassRole_and_other_actions(
self,
):
iam_client = client("iam", region_name=AWS_REGION)
policy_name = "policy1"
policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/ecs",
},
{
"Action": "account:GetAccountInformation",
"Effect": "Allow",
"Resource": "*",
},
],
}
policy_arn = iam_client.create_policy(
PolicyName=policy_name, PolicyDocument=dumps(policy_document)
)["Policy"]["Arn"]
current_audit_info = self.set_mocked_audit_info()
from prowler.providers.aws.services.iam.iam_service import IAM
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client",
new=IAM(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import (
iam_policy_allows_privilege_escalation,
)
check = iam_policy_allows_privilege_escalation()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == policy_name
assert result[0].resource_arn == policy_arn
assert search(
f"Custom Policy {policy_arn} allows privilege escalation using the following actions: ",
result[0].status_extended,
)
assert search("iam:PassRole", result[0].status_extended)
@mock_iam
def test_iam_policy_allows_privilege_escalation_policies_combination(
self,
):
current_audit_info = self.set_mocked_audit_info()
iam_client = client("iam", region_name=AWS_REGION)
policy_name = "privileged_policy"
for values in privilege_escalation_policies_combination.values():
print(list(values))
# We create a new statement in each loop with the combinations required to allow the privilege escalation
policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": list(values),
"Resource": "*",
},
],
}
policy_arn = iam_client.create_policy(
PolicyName=policy_name, PolicyDocument=dumps(policy_document)
)["Policy"]["Arn"]
from prowler.providers.aws.services.iam.iam_service import IAM
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client",
new=IAM(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import (
iam_policy_allows_privilege_escalation,
)
check = iam_policy_allows_privilege_escalation()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == policy_name
assert result[0].resource_arn == policy_arn
assert search(
f"Custom Policy {policy_arn} allows privilege escalation using the following actions: ",
result[0].status_extended,
)
# Check the actions that allow for privilege escalation
for action in values:
assert search(action, result[0].status_extended)
# Delete each IAM policy after the test
iam_client.delete_policy(PolicyArn=policy_arn)
@mock_iam
def test_iam_policy_allows_privilege_escalation_two_policies_one_good_one_bad(
self,
):
current_audit_info = self.set_mocked_audit_info()
iam_client = client("iam", region_name=AWS_REGION)
policy_name_1 = "privileged_policy_1"
policy_document_1 = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["ec2:RunInstances"],
"Resource": "*",
},
],
}
policy_name_2 = "privileged_policy_2"
policy_document_2 = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:PassRole",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": [
"lambda:CreateFunction",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": ["lambda:InvokeFunction"],
"Resource": "*",
},
],
}
policy_arn_1 = iam_client.create_policy(
PolicyName=policy_name_1, PolicyDocument=dumps(policy_document_1)
)["Policy"]["Arn"]
policy_arn_2 = iam_client.create_policy(
PolicyName=policy_name_2, PolicyDocument=dumps(policy_document_2)
)["Policy"]["Arn"]
from prowler.providers.aws.services.iam.iam_service import IAM
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client",
new=IAM(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import (
iam_policy_allows_privilege_escalation,
)
check = iam_policy_allows_privilege_escalation()
result = check.execute()
assert len(result) == 2
for finding in result:
if finding.resource_id == policy_name_1:
assert finding.status == "PASS"
assert finding.resource_arn == policy_arn_1
assert (
finding.status_extended
== f"Custom Policy {policy_arn_1} does not allow privilege escalation"
)
if finding.resource_id == policy_name_2:
assert finding.status == "FAIL"
assert finding.resource_arn == policy_arn_2
assert search(
f"Custom Policy {policy_arn_2} allows privilege escalation using the following actions: ",
finding.status_extended,
)
assert search("iam:PassRole", finding.status_extended)
assert search("lambda:InvokeFunction", finding.status_extended)
assert search("lambda:CreateFunction", finding.status_extended)
@mock_iam
def test_iam_policy_allows_privilege_escalation_two_bad_policies(
self,
):
current_audit_info = self.set_mocked_audit_info()
iam_client = client("iam", region_name=AWS_REGION)
policy_name_1 = "privileged_policy_1"
policy_document_1 = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:PassRole",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": ["ec2:RunInstances"],
"Resource": "*",
},
],
}
policy_name_2 = "privileged_policy_2"
policy_document_2 = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:PassRole",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": [
"lambda:CreateFunction",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": ["lambda:InvokeFunction"],
"Resource": "*",
},
],
}
policy_arn_1 = iam_client.create_policy(
PolicyName=policy_name_1, PolicyDocument=dumps(policy_document_1)
)["Policy"]["Arn"]
policy_arn_2 = iam_client.create_policy(
PolicyName=policy_name_2, PolicyDocument=dumps(policy_document_2)
)["Policy"]["Arn"]
from prowler.providers.aws.services.iam.iam_service import IAM
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation.iam_client",
new=IAM(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.iam.iam_policy_allows_privilege_escalation.iam_policy_allows_privilege_escalation import (
iam_policy_allows_privilege_escalation,
)
check = iam_policy_allows_privilege_escalation()
result = check.execute()
assert len(result) == 2
for finding in result:
if finding.resource_id == policy_name_1:
assert finding.status == "FAIL"
assert finding.resource_arn == policy_arn_1
assert search(
f"Custom Policy {policy_arn_1} allows privilege escalation using the following actions: ",
finding.status_extended,
)
assert search("iam:PassRole", finding.status_extended)
assert search("ec2:RunInstances", finding.status_extended)
if finding.resource_id == policy_name_2:
assert finding.status == "FAIL"
assert finding.resource_arn == policy_arn_2
assert search(
f"Custom Policy {policy_arn_2} allows privilege escalation using the following actions: ",
finding.status_extended,
)
assert search("iam:PassRole", finding.status_extended)
assert search("lambda:InvokeFunction", finding.status_extended)
assert search("lambda:CreateFunction", finding.status_extended)