feat(check): iam-policy-allows-privilege-escalation (#1315)

* feat(check): iam-policy-allows-privilege-escalation

* feat(metadata): Enrich check metadata

Co-authored-by: Toni de la Fuente <toni@blyx.com>

Co-authored-by: Toni de la Fuente <toni@blyx.com>
This commit is contained in:
Pepe Fagoaga
2022-08-04 11:26:42 +02:00
committed by GitHub
parent 5541ec0763
commit 85a6634a56
4 changed files with 149 additions and 0 deletions

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "iam_policy_allows_privilege_escalation",
"CheckTitle": "Ensure no Customer Managed IAM policies allow actions that may lead into Privilege Escalation",
"CheckType": "Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark",
"ServiceName": "iam",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "high",
"ResourceType": "AwsIamPolicy",
"Description": "Ensure no Customer Managed IAM policies allow actions that may lead into Privilege Escalation",
"Risk": "Users with some IAM permissions are allowed to elevate their privileges up to administrator rights.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Grant usage permission on a per-resource basis and applying least privilege principle.",
"Url": "https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "CAF Security Epic: IAM",
"Compliance": []
}

View File

@@ -0,0 +1,102 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.iam.iam_service import iam_client
# Does the tool analyze both users and roles, or just one or the other? --> Everything using AttachementCount.
# Does the tool take a principal-centric or policy-centric approach? --> Policy-centric approach.
# Does the tool handle resource constraints? --> We don't check if the policy affects all resources or not, we check everything.
# Does the tool consider the permissions of service roles? --> Just checks policies.
# Does the tool handle transitive privesc paths (i.e., attack chains)? --> Not yet.
# Does the tool handle the DENY effect as expected? --> Yes, it checks DENY's statements with Action and NotAction.
# Does the tool handle NotAction as expected? --> Yes
# Does the tool handle Condition constraints? --> Not yet.
# Does the tool handle service control policy (SCP) restrictions? --> No, SCP are within Organizations AWS API.
class iam_policy_allows_privilege_escalation(Check):
def execute(self) -> Check_Report:
privilege_escalation_iam_actions = {
"iam:AttachGroupPolicy",
"iam:SetDefaultPolicyVersion2",
"iam:AddUserToGroup",
"iam:AttachRolePolicy",
"iam:AttachUserPolicy",
"iam:CreateAccessKey",
"iam:CreatePolicyVersion",
"iam:CreateLoginProfile",
"iam:PassRole",
"iam:PutGroupPolicy",
"iam:PutRolePolicy",
"iam:PutUserPolicy",
"iam:SetDefaultPolicyVersion",
"iam:UpdateAssumeRolePolicy",
"iam:UpdateLoginProfile",
"sts:AssumeRole",
"ec2:RunInstances",
"lambda:CreateEventSourceMapping",
"lambda:CreateFunction",
"lambda:InvokeFunction",
"lambda:UpdateFunctionCode",
"dynamodb:CreateTable",
"dynamodb:PutItem",
"glue:CreateDevEndpoint",
"glue:GetDevEndpoint",
"glue:GetDevEndpoints",
"glue:UpdateDevEndpoint",
"cloudformation:CreateStack",
"cloudformation:DescribeStacks",
"datapipeline:CreatePipeline",
"datapipeline:PutPipelineDefinition",
"datapipeline:ActivatePipeline",
}
findings = []
for policy in iam_client.customer_managed_policies:
report = Check_Report(self.metadata)
report.resource_id = policy["PolicyName"]
report.resource_arn = policy["Arn"]
report.region = iam_client.region
# List of policy actions
allowed_actions = set()
denied_actions = set()
denied_not_actions = set()
# Recover all policy actions
for statements in policy["PolicyDocument"]["Statement"]:
# Recover allowed actions
if statements["Effect"] == "Allow":
if type(statements["Action"]) is str:
allowed_actions = {statements["Action"]}
if type(statements["Action"]) is list:
allowed_actions = set(statements["Action"])
# Recover denied actions
if statements["Effect"] == "Deny":
if "Action" in statements:
if type(statements["Action"]) is str:
denied_actions = {statements["Action"]}
if type(statements["Action"]) is list:
denied_actions = set(statements["Action"])
if "NotAction" in statements:
if type(statements["NotAction"]) is str:
denied_not_actions = {statements["NotAction"]}
if type(statements["NotAction"]) is list:
denied_not_actions = set(statements["NotAction"])
# First, we need to perform a left join with ALLOWED_ACTIONS and DENIED_ACTIONS
left_actions = allowed_actions.difference(denied_actions)
# Then, we need to find the DENIED_NOT_ACTIONS in LEFT_ACTIONS
privileged_actions = left_actions.intersection(denied_not_actions)
# Finally, check if there is a privilege escalation action within this policy
policy_privilege_escalation_actions = privileged_actions.intersection(
privilege_escalation_iam_actions
)
if len(policy_privilege_escalation_actions) == 0:
report.status = "PASS"
report.status_extended = f"Customer Managed IAM Policy {report.resource_arn} not allows for privilege escalation"
else:
report.status = "FAIL"
report.status_extended = f"Customer Managed IAM Policy {report.resource_arn} allows for privilege escalation using the following actions: {policy_privilege_escalation_actions}"
findings.append(report)
return findings

View File

@@ -18,6 +18,7 @@ class IAM:
self.account_summary = self.__get_account_summary__()
self.virtual_mfa_devices = self.__list_virtual_mfa_devices__()
self.customer_managed_policies = self.__get_customer_managed_policies__()
self.__get_customer_managed_policies_version__(self.customer_managed_policies)
self.credential_report = self.__get_credential_report__()
self.groups = self.__get_groups__()
self.__get_group_users__()
@@ -83,12 +84,23 @@ class IAM:
logger.error(f"{self.region} -- {error.__class__.__name__}: {error}")
else:
customer_managed_policies = []
# Use --scope Local to list only Customer Managed Policies
for page in get_customer_managed_policies_paginator.paginate(Scope="Local"):
for customer_managed_policy in page["Policies"]:
customer_managed_policies.append(customer_managed_policy)
return customer_managed_policies
def __get_customer_managed_policies_version__(self, customer_managed_policies):
try:
for policy in customer_managed_policies:
response = self.client.get_policy_version(
PolicyArn=policy["Arn"], VersionId=policy["DefaultVersionId"]
)
policy["PolicyDocument"] = response["PolicyVersion"]["Document"]
except Exception as error:
logger.error(f"{self.region} -- {error.__class__.__name__}: {error}")
def __get_account_summary__(self):
try:
account_summary = self.client.get_account_summary()