From a795fdc40d74e8f97763c88aabc67f4a7e927c80 Mon Sep 17 00:00:00 2001 From: Sergio Garcia <38561120+sergargar@users.noreply.github.com> Date: Wed, 25 Jan 2023 13:58:58 +0100 Subject: [PATCH] fix(IAM): remove duplicate list_policies function (#1763) Co-authored-by: sergargar --- ...ustom_policy_permissive_role_assumption.py | 16 ++--- .../iam_policy_allows_privilege_escalation.py | 2 +- ...iam_policy_no_administrative_privileges.py | 16 ++--- .../providers/aws/services/iam/iam_service.py | 42 ++---------- .../aws/services/iam/iam_service_test.py | 68 ++----------------- 5 files changed, 25 insertions(+), 119 deletions(-) diff --git a/prowler/providers/aws/services/iam/iam_no_custom_policy_permissive_role_assumption/iam_no_custom_policy_permissive_role_assumption.py b/prowler/providers/aws/services/iam/iam_no_custom_policy_permissive_role_assumption/iam_no_custom_policy_permissive_role_assumption.py index 382d3dba..26185e7e 100644 --- a/prowler/providers/aws/services/iam/iam_no_custom_policy_permissive_role_assumption/iam_no_custom_policy_permissive_role_assumption.py +++ b/prowler/providers/aws/services/iam/iam_no_custom_policy_permissive_role_assumption/iam_no_custom_policy_permissive_role_assumption.py @@ -5,17 +5,17 @@ from prowler.providers.aws.services.iam.iam_client import iam_client class iam_no_custom_policy_permissive_role_assumption(Check): def execute(self) -> Check_Report_AWS: findings = [] - for index, policy_document in enumerate(iam_client.list_policies_version): + for policy in iam_client.policies: report = Check_Report_AWS(self.metadata()) report.region = iam_client.region - report.resource_arn = iam_client.policies[index]["Arn"] - report.resource_id = iam_client.policies[index]["PolicyName"] + report.resource_arn = policy["Arn"] + report.resource_id = policy["PolicyName"] report.status = "PASS" - report.status_extended = f"Custom Policy {iam_client.policies[index]['PolicyName']} does not allow permissive STS Role assumption" - if type(policy_document["Statement"]) != list: - policy_statements = [policy_document["Statement"]] + report.status_extended = f"Custom Policy {policy['PolicyName']} does not allow permissive STS Role assumption" + if type(policy["PolicyDocument"]["Statement"]) != list: + policy_statements = [policy["PolicyDocument"]["Statement"]] else: - policy_statements = policy_document["Statement"] + policy_statements = policy["PolicyDocument"]["Statement"] for statement in policy_statements: if ( statement["Effect"] == "Allow" @@ -28,7 +28,7 @@ class iam_no_custom_policy_permissive_role_assumption(Check): and "*" in statement["Resource"] ): report.status = "FAIL" - report.status_extended = f"Custom Policy {iam_client.policies[index]['PolicyName']} allows permissive STS Role assumption" + report.status_extended = f"Custom Policy {policy['PolicyName']} allows permissive STS Role assumption" break findings.append(report) diff --git a/prowler/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py b/prowler/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py index 8b2766f0..1589923e 100644 --- a/prowler/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py +++ b/prowler/providers/aws/services/iam/iam_policy_allows_privilege_escalation/iam_policy_allows_privilege_escalation.py @@ -60,7 +60,7 @@ class iam_policy_allows_privilege_escalation(Check): "datapipeline:*", } findings = [] - for policy in iam_client.customer_managed_policies: + for policy in iam_client.policies: report = Check_Report_AWS(self.metadata()) report.resource_id = policy["PolicyName"] report.resource_arn = policy["Arn"] diff --git a/prowler/providers/aws/services/iam/iam_policy_no_administrative_privileges/iam_policy_no_administrative_privileges.py b/prowler/providers/aws/services/iam/iam_policy_no_administrative_privileges/iam_policy_no_administrative_privileges.py index abfa02e7..01659133 100644 --- a/prowler/providers/aws/services/iam/iam_policy_no_administrative_privileges/iam_policy_no_administrative_privileges.py +++ b/prowler/providers/aws/services/iam/iam_policy_no_administrative_privileges/iam_policy_no_administrative_privileges.py @@ -5,18 +5,18 @@ from prowler.providers.aws.services.iam.iam_client import iam_client class iam_policy_no_administrative_privileges(Check): def execute(self) -> Check_Report_AWS: findings = [] - for index, policy_document in enumerate(iam_client.list_policies_version): + for policy in iam_client.policies: report = Check_Report_AWS(self.metadata()) report.region = iam_client.region - report.resource_arn = iam_client.policies[index]["Arn"] - report.resource_id = iam_client.policies[index]["PolicyName"] + report.resource_arn = policy["Arn"] + report.resource_id = policy["PolicyName"] report.status = "PASS" - report.status_extended = f"Policy {iam_client.policies[index]['PolicyName']} does not allow '*:*' administrative privileges" + report.status_extended = f"Policy {policy['PolicyName']} does not allow '*:*' administrative privileges" # Check the statements, if one includes *:* stop iterating over the rest - if type(policy_document["Statement"]) != list: - policy_statements = [policy_document["Statement"]] + if type(policy["PolicyDocument"]["Statement"]) != list: + policy_statements = [policy["PolicyDocument"]["Statement"]] else: - policy_statements = policy_document["Statement"] + policy_statements = policy["PolicyDocument"]["Statement"] for statement in policy_statements: if ( statement["Effect"] == "Allow" @@ -25,7 +25,7 @@ class iam_policy_no_administrative_privileges(Check): and "*" in statement["Resource"] ): report.status = "FAIL" - report.status_extended = f"Policy {iam_client.policies[index]['PolicyName']} allows '*:*' administrative privileges" + report.status_extended = f"Policy {policy['PolicyName']} allows '*:*' administrative privileges" break findings.append(report) diff --git a/prowler/providers/aws/services/iam/iam_service.py b/prowler/providers/aws/services/iam/iam_service.py index 3666f6f3..da2d8fc8 100644 --- a/prowler/providers/aws/services/iam/iam_service.py +++ b/prowler/providers/aws/services/iam/iam_service.py @@ -40,8 +40,6 @@ class IAM: self.roles = self.__get_roles__() self.account_summary = self.__get_account_summary__() self.virtual_mfa_devices = self.__list_virtual_mfa_devices__() - self.customer_managed_policies = self.__get_customer_managed_policies__() - self.__get_customer_managed_policies_version__(self.customer_managed_policies) self.credential_report = self.__get_credential_report__() self.groups = self.__get_groups__() self.__get_group_users__() @@ -54,7 +52,7 @@ class IAM: self.__get_entities_attached_to_support_roles__() ) self.policies = self.__list_policies__() - self.list_policies_version = self.__list_policies_version__(self.policies) + self.__list_policies_version__(self.policies) self.saml_providers = self.__list_saml_providers__() self.server_certificates = self.__list_server_certificates__() @@ -118,36 +116,6 @@ class IAM: return groups - def __get_customer_managed_policies__(self): - try: - get_customer_managed_policies_paginator = self.client.get_paginator( - "list_policies" - ) - except Exception as error: - logger.error( - f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" - ) - else: - customer_managed_policies = [] - # Use --scope Local to list only Customer Managed Policies - for page in get_customer_managed_policies_paginator.paginate(Scope="Local"): - for customer_managed_policy in page["Policies"]: - customer_managed_policies.append(customer_managed_policy) - - return customer_managed_policies - - def __get_customer_managed_policies_version__(self, customer_managed_policies): - try: - for policy in customer_managed_policies: - response = self.client.get_policy_version( - PolicyArn=policy["Arn"], VersionId=policy["DefaultVersionId"] - ) - policy["PolicyDocument"] = response["PolicyVersion"]["Document"] - except Exception as error: - logger.error( - f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" - ) - def __get_account_summary__(self): try: account_summary = self.client.get_account_summary() @@ -367,24 +335,22 @@ class IAM: logger.error( f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - finally: + else: return policies def __list_policies_version__(self, policies): try: - policies_version = [] + pass for policy in policies: policy_version = self.client.get_policy_version( PolicyArn=policy["Arn"], VersionId=policy["DefaultVersionId"] ) - policies_version.append(policy_version["PolicyVersion"]["Document"]) + policy["PolicyDocument"] = policy_version["PolicyVersion"]["Document"] except Exception as error: logger.error( f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - finally: - return policies_version def __list_saml_providers__(self): try: diff --git a/tests/providers/aws/services/iam/iam_service_test.py b/tests/providers/aws/services/iam/iam_service_test.py index 2080d371..83069051 100644 --- a/tests/providers/aws/services/iam/iam_service_test.py +++ b/tests/providers/aws/services/iam/iam_service_test.py @@ -1,4 +1,3 @@ -import json from json import dumps from boto3 import client, session @@ -296,65 +295,6 @@ class Test_IAM_Service: iam = IAM(audit_info) assert len(iam.users) == len(iam_client.list_users()["Users"]) - # Test IAM Get Customer Managed Policies - @mock_iam - def test__get_customer_managed_policies__(self): - # Generate IAM Client - iam_client = client("iam") - # Create a new IAM Policy - policy_document = """ -{ - "Version": "2012-10-17", - "Statement": - { - "Effect": "Allow", - "Action": "s3:ListBucket", - "Resource": "arn:aws:s3:::example_bucket" - } -} -""" - iam_client.create_policy( - PolicyName="policy1", - PolicyDocument=policy_document, - ) - # IAM client for this test class - audit_info = self.set_mocked_audit_info() - iam = IAM(audit_info) - assert len(iam.customer_managed_policies) == len( - iam_client.list_policies(Scope="Local")["Policies"] - ) - - # Test IAM Get Customer Managed Policies Version - @mock_iam - def test__get_customer_managed_policies_version__(self): - # Generate IAM Client - iam_client = client("iam") - # Create a new IAM Policy - policy_document = """ -{ - "Version": "2012-10-17", - "Statement": - { - "Effect": "Allow", - "Action": "s3:ListBucket", - "Resource": "arn:aws:s3:::example_bucket" - } -} -""" - iam_client.create_policy( - PolicyName="policy1", - PolicyDocument=policy_document, - ) - - # IAM client for this test class - audit_info = self.set_mocked_audit_info() - iam = IAM(audit_info) - - assert len(iam.customer_managed_policies) == 1 - assert iam.customer_managed_policies[0]["PolicyDocument"] == json.loads( - policy_document - ) - # Test IAM Get Account Summary @mock_iam def test__get_account_summary__(self): @@ -630,10 +570,10 @@ class Test_IAM_Service: audit_info = self.set_mocked_audit_info() iam = IAM(audit_info) - assert len(iam.list_policies_version) == 1 - assert iam.list_policies_version[0]["Statement"][0]["Effect"] == "Allow" - assert iam.list_policies_version[0]["Statement"][0]["Action"] == "*" - assert iam.list_policies_version[0]["Statement"][0]["Resource"] == "*" + assert len(iam.policies) == 1 + assert iam.policies[0]["PolicyDocument"]["Statement"][0]["Effect"] == "Allow" + assert iam.policies[0]["PolicyDocument"]["Statement"][0]["Action"] == "*" + assert iam.policies[0]["PolicyDocument"]["Statement"][0]["Resource"] == "*" # Test IAM List SAML Providers @mock_iam