diff --git a/providers/aws/services/iam/iam_policy_allows_privilege_escalation/__init__.py b/providers/aws/services/iam/iam_policy_allows_privilege_escalation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.metadata.json b/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.metadata.json new file mode 100644 index 00000000..43b1a72b --- /dev/null +++ b/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "iam_policy_allows_privilege_escalation", + "CheckTitle": "Ensure no Customer Managed IAM policies allow actions that may lead into Privilege Escalation", + "CheckType": "Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark", + "ServiceName": "iam", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "Severity": "high", + "ResourceType": "AwsIamPolicy", + "Description": "Ensure no Customer Managed IAM policies allow actions that may lead into Privilege Escalation", + "Risk": "Users with some IAM permissions are allowed to elevate their privileges up to administrator rights.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Grant usage permission on a per-resource basis and applying least privilege principle.", + "Url": "https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "CAF Security Epic: IAM", + "Compliance": [] +} diff --git a/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py b/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py new file mode 100644 index 00000000..115f9897 --- /dev/null +++ b/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py @@ -0,0 +1,102 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.iam.iam_service import iam_client + +# Does the tool analyze both users and roles, or just one or the other? --> Everything using AttachementCount. +# Does the tool take a principal-centric or policy-centric approach? --> Policy-centric approach. +# Does the tool handle resource constraints? --> We don't check if the policy affects all resources or not, we check everything. +# Does the tool consider the permissions of service roles? --> Just checks policies. +# Does the tool handle transitive privesc paths (i.e., attack chains)? --> Not yet. +# Does the tool handle the DENY effect as expected? --> Yes, it checks DENY's statements with Action and NotAction. +# Does the tool handle NotAction as expected? --> Yes +# Does the tool handle Condition constraints? --> Not yet. +# Does the tool handle service control policy (SCP) restrictions? --> No, SCP are within Organizations AWS API. + + +class iam_policy_allows_privilege_escalation(Check): + def execute(self) -> Check_Report: + privilege_escalation_iam_actions = { + "iam:AttachGroupPolicy", + "iam:SetDefaultPolicyVersion2", + "iam:AddUserToGroup", + "iam:AttachRolePolicy", + "iam:AttachUserPolicy", + "iam:CreateAccessKey", + "iam:CreatePolicyVersion", + "iam:CreateLoginProfile", + "iam:PassRole", + "iam:PutGroupPolicy", + "iam:PutRolePolicy", + "iam:PutUserPolicy", + "iam:SetDefaultPolicyVersion", + "iam:UpdateAssumeRolePolicy", + "iam:UpdateLoginProfile", + "sts:AssumeRole", + "ec2:RunInstances", + "lambda:CreateEventSourceMapping", + "lambda:CreateFunction", + "lambda:InvokeFunction", + "lambda:UpdateFunctionCode", + "dynamodb:CreateTable", + "dynamodb:PutItem", + "glue:CreateDevEndpoint", + "glue:GetDevEndpoint", + "glue:GetDevEndpoints", + "glue:UpdateDevEndpoint", + "cloudformation:CreateStack", + "cloudformation:DescribeStacks", + "datapipeline:CreatePipeline", + "datapipeline:PutPipelineDefinition", + "datapipeline:ActivatePipeline", + } + findings = [] + for policy in iam_client.customer_managed_policies: + report = Check_Report(self.metadata) + report.resource_id = policy["PolicyName"] + report.resource_arn = policy["Arn"] + report.region = iam_client.region + + # List of policy actions + allowed_actions = set() + denied_actions = set() + denied_not_actions = set() + + # Recover all policy actions + for statements in policy["PolicyDocument"]["Statement"]: + # Recover allowed actions + if statements["Effect"] == "Allow": + if type(statements["Action"]) is str: + allowed_actions = {statements["Action"]} + if type(statements["Action"]) is list: + allowed_actions = set(statements["Action"]) + + # Recover denied actions + if statements["Effect"] == "Deny": + if "Action" in statements: + if type(statements["Action"]) is str: + denied_actions = {statements["Action"]} + if type(statements["Action"]) is list: + denied_actions = set(statements["Action"]) + + if "NotAction" in statements: + if type(statements["NotAction"]) is str: + denied_not_actions = {statements["NotAction"]} + if type(statements["NotAction"]) is list: + denied_not_actions = set(statements["NotAction"]) + + # First, we need to perform a left join with ALLOWED_ACTIONS and DENIED_ACTIONS + left_actions = allowed_actions.difference(denied_actions) + # Then, we need to find the DENIED_NOT_ACTIONS in LEFT_ACTIONS + privileged_actions = left_actions.intersection(denied_not_actions) + # Finally, check if there is a privilege escalation action within this policy + policy_privilege_escalation_actions = privileged_actions.intersection( + privilege_escalation_iam_actions + ) + + if len(policy_privilege_escalation_actions) == 0: + report.status = "PASS" + report.status_extended = f"Customer Managed IAM Policy {report.resource_arn} not allows for privilege escalation" + else: + report.status = "FAIL" + report.status_extended = f"Customer Managed IAM Policy {report.resource_arn} allows for privilege escalation using the following actions: {policy_privilege_escalation_actions}" + findings.append(report) + return findings diff --git a/providers/aws/services/iam/iam_service.py b/providers/aws/services/iam/iam_service.py index 366771e2..75768697 100644 --- a/providers/aws/services/iam/iam_service.py +++ b/providers/aws/services/iam/iam_service.py @@ -18,6 +18,7 @@ class IAM: self.account_summary = self.__get_account_summary__() self.virtual_mfa_devices = self.__list_virtual_mfa_devices__() self.customer_managed_policies = self.__get_customer_managed_policies__() + self.__get_customer_managed_policies_version__(self.customer_managed_policies) self.credential_report = self.__get_credential_report__() self.groups = self.__get_groups__() self.__get_group_users__() @@ -83,12 +84,23 @@ class IAM: logger.error(f"{self.region} -- {error.__class__.__name__}: {error}") else: customer_managed_policies = [] + # Use --scope Local to list only Customer Managed Policies for page in get_customer_managed_policies_paginator.paginate(Scope="Local"): for customer_managed_policy in page["Policies"]: customer_managed_policies.append(customer_managed_policy) return customer_managed_policies + def __get_customer_managed_policies_version__(self, customer_managed_policies): + try: + for policy in customer_managed_policies: + response = self.client.get_policy_version( + PolicyArn=policy["Arn"], VersionId=policy["DefaultVersionId"] + ) + policy["PolicyDocument"] = response["PolicyVersion"]["Document"] + except Exception as error: + logger.error(f"{self.region} -- {error.__class__.__name__}: {error}") + def __get_account_summary__(self): try: account_summary = self.client.get_account_summary()