From f7842fdcdde2ac20b7790b33363b354a62e90dce Mon Sep 17 00:00:00 2001 From: Sergio Garcia <38561120+sergargar@users.noreply.github.com> Date: Fri, 28 Oct 2022 12:30:34 +0200 Subject: [PATCH] feat(kms): add service, checks and tests (#1439) --- .../aws/services/{kms => cloudwatch}/check37 | 0 providers/aws/services/kms/__init__.py | 0 providers/aws/services/kms/check28 | 80 --------- providers/aws/services/kms/check_extra7126 | 45 ----- providers/aws/services/kms/check_extra736 | 64 ------- providers/aws/services/kms/kms_client.py | 4 + .../services/kms/kms_cmk_are_used/__init__.py | 0 .../kms_cmk_are_used.metadata.json | 35 ++++ .../kms/kms_cmk_are_used/kms_cmk_are_used.py | 26 +++ .../kms_cmk_are_used/kms_cmk_are_used_test.py | 142 +++++++++++++++ .../kms/kms_cmk_rotation_enabled/__init__.py | 0 .../kms_cmk_rotation_enabled.metadata.json | 35 ++++ .../kms_cmk_rotation_enabled.py | 28 +++ .../kms_cmk_rotation_enabled_test.py | 112 ++++++++++++ .../__init__.py | 0 ..._key_not_publicly_accessible.metadata.json | 35 ++++ .../kms_key_not_publicly_accessible.py | 37 ++++ .../kms_key_not_publicly_accessible_test.py | 128 ++++++++++++++ providers/aws/services/kms/kms_service.py | 120 +++++++++++++ .../aws/services/kms/kms_service_test.py | 162 ++++++++++++++++++ 20 files changed, 864 insertions(+), 189 deletions(-) rename providers/aws/services/{kms => cloudwatch}/check37 (100%) create mode 100644 providers/aws/services/kms/__init__.py delete mode 100644 providers/aws/services/kms/check28 delete mode 100644 providers/aws/services/kms/check_extra7126 delete mode 100644 providers/aws/services/kms/check_extra736 create mode 100644 providers/aws/services/kms/kms_client.py create mode 100644 providers/aws/services/kms/kms_cmk_are_used/__init__.py create mode 100644 providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used.metadata.json create mode 100644 providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used.py create mode 100644 providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used_test.py create mode 100644 providers/aws/services/kms/kms_cmk_rotation_enabled/__init__.py create mode 100644 providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled.metadata.json create mode 100644 providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled.py create mode 100644 providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled_test.py create mode 100644 providers/aws/services/kms/kms_key_not_publicly_accessible/__init__.py create mode 100644 providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible.metadata.json create mode 100644 providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible.py create mode 100644 providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible_test.py create mode 100644 providers/aws/services/kms/kms_service.py create mode 100644 providers/aws/services/kms/kms_service_test.py diff --git a/providers/aws/services/kms/check37 b/providers/aws/services/cloudwatch/check37 similarity index 100% rename from providers/aws/services/kms/check37 rename to providers/aws/services/cloudwatch/check37 diff --git a/providers/aws/services/kms/__init__.py b/providers/aws/services/kms/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/kms/check28 b/providers/aws/services/kms/check28 deleted file mode 100644 index 2770d0a6..00000000 --- a/providers/aws/services/kms/check28 +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env bash - -# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -CHECK_ID_check28="2.8" -CHECK_TITLE_check28="[check28] Ensure rotation for customer created KMS CMKs is enabled" -CHECK_SCORED_check28="SCORED" -CHECK_CIS_LEVEL_check28="LEVEL2" -CHECK_SEVERITY_check28="Medium" -CHECK_ASFF_TYPE_check28="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark" -CHECK_ASFF_RESOURCE_TYPE_check28="AwsKmsKey" -CHECK_ALTERNATE_check208="check28" -CHECK_SERVICENAME_check28="kms" -CHECK_RISK_check28='Cryptographic best practices discourage extensive reuse of encryption keys. Consequently; Customer Master Keys (CMKs) should be rotated to prevent usage of compromised keys.' -CHECK_REMEDIATION_check28='For every KMS Customer Master Keys (CMKs); ensure that Rotate this key every year is enabled.' -CHECK_DOC_check28='https://docs.aws.amazon.com/kms/latest/developerguide/rotate-keys.html' -CHECK_CAF_EPIC_check28='Data Protection' - -check28(){ - # "Ensure rotation for customer created CMKs is enabled (Scored)" - for regx in $REGIONS; do - CHECK_KMS_KEYLIST=$($AWSCLI kms list-keys $PROFILE_OPT --region $regx --output text --query 'Keys[*].KeyId' --output text 2>&1) - if [[ $(echo "$CHECK_KMS_KEYLIST" | grep AccessDenied) ]]; then - textInfo "$regx: Access Denied trying to list keys" "$regx" "$key" - continue - fi - if [[ $CHECK_KMS_KEYLIST ]]; then - cmk_count=0 - for key in $CHECK_KMS_KEYLIST; do - KMSDETAILS=$($AWSCLI kms describe-key --key-id $key $PROFILE_OPT --region $regx --query 'KeyMetadata.{key:KeyId,man:KeyManager,origin:Origin,spec:CustomerMasterKeySpec,state:KeyState}' --output text 2>&1 | grep SYMMETRIC) - if [[ $(echo "$KMSDETAILS" | grep AccessDenied) ]]; then - textInfo "$regx: Access Denied describing key $key" "$regx" "$key" - continue - fi - - KEYID=$(echo $KMSDETAILS | awk '{print $1}') - KEYMANAGER=$(echo $KMSDETAILS | awk '{print $2}') - KEYORIGIN=$(echo $KMSDETAILS | awk '{print $3}') - KEYSTATE=$(echo $KMSDETAILS | awk '{print $5}') - - if [[ "$KEYMANAGER" == "AWS" ]]; then - continue - fi - if [[ "$KEYSTATE" != "Enabled" ]]; then - continue - fi - cmk_count=$((cmk_count + 1)) - - if [[ "$KEYORIGIN" == "EXTERNAL" ]]; then - textPass "$regx: Key $key uses imported key material" "$regx" "$key" - else - CHECK_KMS_KEY_ROTATION=$($AWSCLI kms get-key-rotation-status --key-id $key $PROFILE_OPT --region $regx --output text 2>&1) - if [[ $(echo "$CHECK_KMS_KEY_ROTATION" | grep AccessDenied) ]]; then - textInfo "$regx: Access Denied getting key rotation status for $key " "$regx" "$key" - continue - fi - if [[ "$CHECK_KMS_KEY_ROTATION" == "True" ]];then - textPass "$regx: Key $key automatic rotation of the key material is enabled" "$regx" "$key" - else - textFail "$regx: Key $key automatic rotation of the key material is disabled" "$regx" "$key" - fi - fi - done - if [[ $cmk_count == 0 ]]; then - textInfo "$regx: This region has no customer managed keys" "$regx" "$key" - fi - else - textInfo "$regx: This region has no KMS keys" "$regx" "$key" - fi - done -} diff --git a/providers/aws/services/kms/check_extra7126 b/providers/aws/services/kms/check_extra7126 deleted file mode 100644 index fbd67c62..00000000 --- a/providers/aws/services/kms/check_extra7126 +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env bash - -# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. -CHECK_ID_extra7126="7.126" -CHECK_TITLE_extra7126="[extra7126] Check if there are CMK KMS keys not used" -CHECK_SCORED_extra7126="NOT_SCORED" -CHECK_CIS_LEVEL_extra7126="EXTRA" -CHECK_SEVERITY_extra7126="Medium" -CHECK_ASFF_RESOURCE_TYPE_extra7126="AwsKmsKey" -CHECK_ALTERNATE_check7126="extra7126" -CHECK_ASFF_COMPLIANCE_TYPE_extra7126="op.exp.11.aws.kms.2" -CHECK_SERVICENAME_extra7126="kms" -CHECK_RISK_extra7126='Unused keys may increase service cost.' -CHECK_REMEDIATION_extra7126='Before deleting a customer master key (CMK); you might want to know how many cipher-texts were encrypted under that key. ' -CHECK_DOC_extra7126='https://docs.aws.amazon.com/kms/latest/developerguide/deleting-keys-determining-usage.html' -CHECK_CAF_EPIC_extra7126='Data Protection' - -extra7126(){ - for regx in $REGIONS; do - LIST_OF_CUSTOMER_KMS_KEYS=$($AWSCLI kms list-aliases $PROFILE_OPT --region $regx --query "Aliases[].[AliasName,TargetKeyId]" --output text |grep -v ^alias/aws/ |awk '{ print $2 }') - if [[ $LIST_OF_CUSTOMER_KMS_KEYS ]];then - for key in $LIST_OF_CUSTOMER_KMS_KEYS; do - CHECK_STATUS=$($AWSCLI kms describe-key --key-id $key $PROFILE_OPT --region $regx --output json | jq -r '.KeyMetadata.KeyState') - if [[ $CHECK_STATUS == "PendingDeletion" ]]; then - textInfo "$regx: KMS key $key is pending deletion" "$regx" - elif [[ $CHECK_STATUS == "Disabled" ]]; then - textInfo "$regx: KMS key $key is disabled" "$regx" "$key" - else - textPass "$regx: KMS key $key is not disabled or pending deletion" "$regx" "$key" - fi - done - else - textInfo "$regx: No KMS keys found" "$regx" - fi - done -} diff --git a/providers/aws/services/kms/check_extra736 b/providers/aws/services/kms/check_extra736 deleted file mode 100644 index 10c42a87..00000000 --- a/providers/aws/services/kms/check_extra736 +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env bash - -# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. -CHECK_ID_extra736="7.36" -CHECK_TITLE_extra736="[extra736] Check exposed KMS keys" -CHECK_SCORED_extra736="NOT_SCORED" -CHECK_CIS_LEVEL_extra736="EXTRA" -CHECK_SEVERITY_extra736="Critical" -CHECK_ASFF_RESOURCE_TYPE_extra736="AwsKmsKey" -CHECK_ALTERNATE_check736="extra736" -CHECK_ASFF_COMPLIANCE_TYPE_extra736="ens-op.exp.11.aws.kms.2" -CHECK_SERVICENAME_extra736="kms" -CHECK_RISK_extra736='Exposed KMS Keys or wide policy permissions my leave data unprotected.' -CHECK_REMEDIATION_extra736='To determine the full extent of who or what currently has access to a customer master key (CMK) in AWS KMS; you must examine the CMK key policy; all grants that apply to the CMK; and potentially all AWS Identity and Access Management (IAM) policies. You might do this to determine the scope of potential usage of a CMK.' -CHECK_DOC_extra736='https://docs.aws.amazon.com/kms/latest/developerguide/determining-access.html' -CHECK_CAF_EPIC_extra736='Data Protection' - -extra736(){ - for regx in $REGIONS; do - local CUSTOMER_MANAGED_KMS_KEYS=() - # First, we need to recover every KMS key - LIST_OF_KMS_KEYS=$($AWSCLI kms list-keys $PROFILE_OPT --region "${regx}" --query "Keys[].[KeyArn]" --output text) - if [[ $LIST_OF_KMS_KEYS ]] - then - # Second, we need to check for Customer Managed KMS keys, with or without a configured alias - for keyID in ${LIST_OF_KMS_KEYS} - do - KMS_KEY_MANAGER=$($AWSCLI kms describe-key $PROFILE_OPT --region "${regx}" --key-id "${keyID}" --query "KeyMetadata.KeyManager" --output text) - if [[ "${KMS_KEY_MANAGER}" == "CUSTOMER" ]] - then - CUSTOMER_MANAGED_KMS_KEYS+=( "${keyID}" ) - fi - done - else - textInfo "${regx}: No KMS keys found" "${regx}" - continue - fi - - # Third, we need to check the policy included in every Customer Managed KMS key - if [[ "${CUSTOMER_MANAGED_KMS_KEYS[*]}" ]] - then - for keyID in "${CUSTOMER_MANAGED_KMS_KEYS[@]}" - do - CHECK_POLICY=$($AWSCLI kms get-key-policy --key-id "${keyID}" --policy-name default $PROFILE_OPT --region "${regx}" --output text | jq '.Statement[]|select(.Effect=="Allow" and (((.Principal|type == "object") and .Principal.AWS == "*") or ((.Principal|type == "string") and .Principal == "*")) and .Condition == null)') - if [[ $CHECK_POLICY ]]; then - textFail "${regx}: KMS key ${keyID} may be publicly accessible!" "${regx}" "${keyID}" - else - textPass "${regx}: KMS key ${keyID} is not exposed to Public" "${regx}" "${keyID}" - fi - done - else - textInfo "${regx}: No Customer Managed KMS keys found" "${regx}" - fi - done -} diff --git a/providers/aws/services/kms/kms_client.py b/providers/aws/services/kms/kms_client.py new file mode 100644 index 00000000..cf3c08fc --- /dev/null +++ b/providers/aws/services/kms/kms_client.py @@ -0,0 +1,4 @@ +from providers.aws.lib.audit_info.audit_info import current_audit_info +from providers.aws.services.kms.kms_service import KMS + +kms_client = KMS(current_audit_info) diff --git a/providers/aws/services/kms/kms_cmk_are_used/__init__.py b/providers/aws/services/kms/kms_cmk_are_used/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used.metadata.json b/providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used.metadata.json new file mode 100644 index 00000000..0bbdcb88 --- /dev/null +++ b/providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "kms_cmk_are_used", + "CheckTitle": "Check if there are CMK KMS keys not used.", + "CheckType": ["Data Protection"], + "ServiceName": "kms", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:kms:region:account-id:certificate/resource-id", + "Severity": "medium", + "ResourceType": "AwsKmsKey", + "Description": "Check if there are CMK KMS keys not used.", + "Risk": "Unused keys may increase service cost.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "aws kms schedule-key-deletion --key-id --pending-window-in-days 7", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Before deleting a customer master key (CMK), you might want to know how many cipher-texts were encrypted under that key.", + "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/deleting-keys-determining-usage.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used.py b/providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used.py new file mode 100644 index 00000000..b963f8ef --- /dev/null +++ b/providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used.py @@ -0,0 +1,26 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.kms.kms_client import kms_client + + +class kms_cmk_are_used(Check): + def execute(self): + findings = [] + for key in kms_client.keys: + # Only check CMKs keys + if key.manager == "CUSTOMER": + report = Check_Report(self.metadata) + report.region = key.region + report.resource_id = key.id + report.resource_arn = key.arn + if key.state != "Enabled": + if key.state == "PendingDeletion": + report.status = "PASS" + report.status_extended = f"KMS CMK {key.id} is not being used but it has scheduled deletion." + else: + report.status = "FAIL" + report.status_extended = f"KMS CMK {key.id} is not being used." + else: + report.status = "PASS" + report.status_extended = f"KMS CMK {key.id} is being used." + findings.append(report) + return findings diff --git a/providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used_test.py b/providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used_test.py new file mode 100644 index 00000000..871ba225 --- /dev/null +++ b/providers/aws/services/kms/kms_cmk_are_used/kms_cmk_are_used_test.py @@ -0,0 +1,142 @@ +from unittest import mock + +from boto3 import client +from moto import mock_kms + +AWS_REGION = "us-east-1" + + +class Test_kms_cmk_are_used: + @mock_kms + def test_kms_no_keys(self): + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.kms.kms_service import KMS + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client", + new=KMS(current_audit_info), + ): + # Test Check + from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import ( + kms_cmk_are_used, + ) + + check = kms_cmk_are_used() + result = check.execute() + + assert len(result) == 0 + + @mock_kms + def test_kms_cmk_are_used(self): + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Create enabled KMS key + key = kms_client.create_key()["KeyMetadata"] + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.kms.kms_service import KMS + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client", + new=KMS(current_audit_info), + ): + # Test Check + from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import ( + kms_cmk_are_used, + ) + + check = kms_cmk_are_used() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].status_extended == f"KMS CMK {key['KeyId']} is being used." + assert result[0].resource_id == key["KeyId"] + assert result[0].resource_arn == key["Arn"] + + @mock_kms + def test_kms_key_with_deletion(self): + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Creaty KMS key with deletion + key = kms_client.create_key()["KeyMetadata"] + kms_client.schedule_key_deletion(KeyId=key["KeyId"]) + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.kms.kms_service import KMS + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client", + new=KMS(current_audit_info), + ): + # Test Check + from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import ( + kms_cmk_are_used, + ) + + check = kms_cmk_are_used() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"KMS CMK {key['KeyId']} is not being used but it has scheduled deletion." + ) + assert result[0].resource_id == key["KeyId"] + assert result[0].resource_arn == key["Arn"] + + @mock_kms + def test_kms_disabled_key(self): + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Creaty KMS key with deletion + key = kms_client.create_key()["KeyMetadata"] + kms_client.disable_key(KeyId=key["KeyId"]) + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.kms.kms_service import KMS + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client", + new=KMS(current_audit_info), + ): + # Test Check + from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import ( + kms_cmk_are_used, + ) + + check = kms_cmk_are_used() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"KMS CMK {key['KeyId']} is not being used." + ) + assert result[0].resource_id == key["KeyId"] + assert result[0].resource_arn == key["Arn"] + + @mock_kms + def test_bad_response(self): + mock_client = mock.MagicMock() + + with mock.patch( + "providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client", + new=mock_client, + ): + # Test Check + from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import ( + kms_cmk_are_used, + ) + + check = kms_cmk_are_used() + result = check.execute() + + assert len(result) == 0 diff --git a/providers/aws/services/kms/kms_cmk_rotation_enabled/__init__.py b/providers/aws/services/kms/kms_cmk_rotation_enabled/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled.metadata.json b/providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled.metadata.json new file mode 100644 index 00000000..1c126ea6 --- /dev/null +++ b/providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "kms_cmk_rotation_enabled", + "CheckTitle": "Ensure rotation for customer created KMS CMKs is enabled.", + "CheckType": ["Data Protection"], + "ServiceName": "kms", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:kms:region:account-id:certificate/resource-id", + "Severity": "medium", + "ResourceType": "AwsKmsKey", + "Description": "Ensure rotation for customer created KMS CMKs is enabled.", + "Risk": "Cryptographic best practices discourage extensive reuse of encryption keys. Consequently, Customer Master Keys (CMKs) should be rotated to prevent usage of compromised keys.", + "RelatedUrl": "https://aws.amazon.com/blogs/security/how-to-get-ready-for-certificate-transparency/", + "Remediation": { + "Code": { + "CLI": "aws kms enable-key-rotation --key-id ", + "NativeIaC": "", + "Other": "", + "Terraform": "https://docs.bridgecrew.io/docs/ensure-kms-have-rotation-policy#terraform" + }, + "Recommendation": { + "Text": "For every KMS Customer Master Keys (CMKs), ensure that Rotate this key every year is enabled.", + "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/rotate-keys.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled.py b/providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled.py new file mode 100644 index 00000000..d695417f --- /dev/null +++ b/providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled.py @@ -0,0 +1,28 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.kms.kms_client import kms_client + + +class kms_cmk_rotation_enabled(Check): + def execute(self): + findings = [] + for key in kms_client.keys: + report = Check_Report(self.metadata) + report.region = key.region + # Only check enabled CMKs keys + if key.manager == "CUSTOMER" and key.state == "Enabled": + if key.rotation_enabled: + report.status = "PASS" + report.status_extended = ( + f"KMS CMK {key.id} has automatic rotation enabled." + ) + report.resource_id = key.id + report.resource_arn = key.arn + else: + report.status = "FAIL" + report.status_extended = ( + f"KMS CMK {key.id} has automatic rotation disabled." + ) + report.resource_id = key.id + report.resource_arn = key.arn + findings.append(report) + return findings diff --git a/providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled_test.py b/providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled_test.py new file mode 100644 index 00000000..b2620aec --- /dev/null +++ b/providers/aws/services/kms/kms_cmk_rotation_enabled/kms_cmk_rotation_enabled_test.py @@ -0,0 +1,112 @@ +from unittest import mock + +from boto3 import client +from moto import mock_kms + +AWS_REGION = "us-east-1" + + +class Test_kms_cmk_rotation_enabled: + @mock_kms + def test_kms_no_key(self): + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.kms.kms_service import KMS + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled.kms_client", + new=KMS(current_audit_info), + ): + # Test Check + from providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled import ( + kms_cmk_rotation_enabled, + ) + + check = kms_cmk_rotation_enabled() + result = check.execute() + + assert len(result) == 0 + + @mock_kms + def test_kms_cmk_rotation_enabled(self): + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Creaty KMS key with rotation + key = kms_client.create_key()["KeyMetadata"] + kms_client.enable_key_rotation(KeyId=key["KeyId"]) + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.kms.kms_service import KMS + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled.kms_client", + new=KMS(current_audit_info), + ): + # Test Check + from providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled import ( + kms_cmk_rotation_enabled, + ) + + check = kms_cmk_rotation_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"KMS CMK {key['KeyId']} has automatic rotation enabled." + ) + assert result[0].resource_id == key["KeyId"] + assert result[0].resource_arn == key["Arn"] + + @mock_kms + def test_kms_cmk_rotation_disabled(self): + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Creaty KMS key without rotation + key = kms_client.create_key()["KeyMetadata"] + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.kms.kms_service import KMS + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled.kms_client", + new=KMS(current_audit_info), + ): + # Test Check + from providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled import ( + kms_cmk_rotation_enabled, + ) + + check = kms_cmk_rotation_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"KMS CMK {key['KeyId']} has automatic rotation disabled." + ) + assert result[0].resource_id == key["KeyId"] + assert result[0].resource_arn == key["Arn"] + + @mock_kms + def test_bad_response(self): + mock_client = mock.MagicMock() + + with mock.patch( + "providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled.kms_client", + new=mock_client, + ): + # Test Check + from providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled import ( + kms_cmk_rotation_enabled, + ) + + check = kms_cmk_rotation_enabled() + result = check.execute() + + assert len(result) == 0 diff --git a/providers/aws/services/kms/kms_key_not_publicly_accessible/__init__.py b/providers/aws/services/kms/kms_key_not_publicly_accessible/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible.metadata.json b/providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible.metadata.json new file mode 100644 index 00000000..87863691 --- /dev/null +++ b/providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "kms_key_not_publicly_accessible", + "CheckTitle": "Check exposed KMS keys", + "CheckType": ["Data Protection"], + "ServiceName": "kms", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:kms:region:account-id:certificate/resource-id", + "Severity": "medium", + "ResourceType": "AwsKmsKey", + "Description": "Check exposed KMS keys", + "Risk": "Exposed KMS Keys or wide policy permissions my leave data unprotected.", + "RelatedUrl": "https://docs.aws.amazon.com/kms/latest/developerguide/determining-access.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "https://github.com/cloudmatos/matos/tree/master/remediations/aws/kms/exposed-key", + "Terraform": "" + }, + "Recommendation": { + "Text": "To determine the full extent of who or what currently has access to a customer master key (CMK) in AWS KMS, you must examine the CMK key policy, all grants that apply to the CMK and potentially all AWS Identity and Access Management (IAM) policies. You might do this to determine the scope of potential usage of a CMK.", + "Url": "https://docs.aws.amazon.com/kms/latest/developerguide/determining-access.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible.py b/providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible.py new file mode 100644 index 00000000..6577a471 --- /dev/null +++ b/providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible.py @@ -0,0 +1,37 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.kms.kms_client import kms_client + + +class kms_key_not_publicly_accessible(Check): + def execute(self): + findings = [] + for key in kms_client.keys: + if ( + key.manager == "CUSTOMER" and key.state == "Enabled" + ): # only customer KMS have policies + report = Check_Report(self.metadata) + report.status = "PASS" + report.status_extended = f"KMS key {key.id} is not exposed to Public." + report.resource_id = key.id + report.resource_arn = key.arn + report.region = key.region + # If the "Principal" element value is set to { "AWS": "*" } and the policy statement is not using any Condition clauses to filter the access, the selected AWS KMS master key is publicly accessible. + for statement in key.policy["Statement"]: + if "*" == statement["Principal"] and not "Condition" in statement: + report.status = "FAIL" + report.status_extended = ( + f"KMS key {key.id} may be publicly accessible!" + ) + else: + if type(statement["Principal"]["AWS"]) == str: + principals = [statement["Principal"]["AWS"]] + else: + principals = statement["Principal"]["AWS"] + for principal_arn in principals: + if principal_arn == "*" and not "Condition" in statement: + report.status = "FAIL" + report.status_extended = ( + f"KMS key {key.id} may be publicly accessible!" + ) + findings.append(report) + return findings diff --git a/providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible_test.py b/providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible_test.py new file mode 100644 index 00000000..2640b278 --- /dev/null +++ b/providers/aws/services/kms/kms_key_not_publicly_accessible/kms_key_not_publicly_accessible_test.py @@ -0,0 +1,128 @@ +import json +from unittest import mock + +from boto3 import client +from moto import mock_kms + +AWS_REGION = "us-east-1" + + +class Test_kms_key_not_publicly_accessible: + @mock_kms + def test_no_kms_keys(self): + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.kms.kms_service import KMS + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible.kms_client", + new=KMS(current_audit_info), + ): + # Test Check + from providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible import ( + kms_key_not_publicly_accessible, + ) + + check = kms_key_not_publicly_accessible() + result = check.execute() + + assert len(result) == 0 + + @mock_kms + def test_kms_key_not_publicly_accessible(self): + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Creaty KMS key without policy + key = kms_client.create_key()["KeyMetadata"] + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.kms.kms_service import KMS + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible.kms_client", + new=KMS(current_audit_info), + ): + # Test Check + from providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible import ( + kms_key_not_publicly_accessible, + ) + + check = kms_key_not_publicly_accessible() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"KMS key {key['KeyId']} is not exposed to Public." + ) + assert result[0].resource_id == key["KeyId"] + assert result[0].resource_arn == key["Arn"] + + @mock_kms + def test_kms_key_public_accessible(self): + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Creaty KMS key with public policy + key = kms_client.create_key( + Policy=json.dumps( + { + "Version": "2012-10-17", + "Id": "key-default-1", + "Statement": [ + { + "Sid": "Enable IAM User Permissions", + "Effect": "Allow", + "Principal": "*", + "Action": "kms:*", + "Resource": "*", + } + ], + } + ) + )["KeyMetadata"] + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.kms.kms_service import KMS + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible.kms_client", + new=KMS(current_audit_info), + ): + # Test Check + from providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible import ( + kms_key_not_publicly_accessible, + ) + + check = kms_key_not_publicly_accessible() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"KMS key {key['KeyId']} may be publicly accessible!" + ) + assert result[0].resource_id == key["KeyId"] + assert result[0].resource_arn == key["Arn"] + + @mock_kms + def test_bad_response(self): + mock_client = mock.MagicMock() + + with mock.patch( + "providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible.kms_client", + new=mock_client, + ): + # Test Check + from providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible import ( + kms_key_not_publicly_accessible, + ) + + check = kms_key_not_publicly_accessible() + result = check.execute() + + assert len(result) == 0 diff --git a/providers/aws/services/kms/kms_service.py b/providers/aws/services/kms/kms_service.py new file mode 100644 index 00000000..42ed75ae --- /dev/null +++ b/providers/aws/services/kms/kms_service.py @@ -0,0 +1,120 @@ +import json +import threading +from dataclasses import dataclass + +from lib.logger import logger +from providers.aws.aws_provider import generate_regional_clients + + +################## KMS +class KMS: + def __init__(self, audit_info): + self.service = "kms" + self.session = audit_info.audit_session + self.audited_account = audit_info.audited_account + self.regional_clients = generate_regional_clients(self.service, audit_info) + self.keys = [] + self.__threading_call__(self.__list_keys__) + self.__describe_key__() + self.__get_key_rotation_status__() + self.__get_key_policy__() + + def __get_session__(self): + return self.session + + def __threading_call__(self, call): + threads = [] + for regional_client in self.regional_clients.values(): + threads.append(threading.Thread(target=call, args=(regional_client,))) + for t in threads: + t.start() + for t in threads: + t.join() + + def __list_keys__(self, regional_client): + logger.info("KMS - Listing Keys...") + try: + list_keys_paginator = regional_client.get_paginator("list_keys") + for page in list_keys_paginator.paginate(): + for key in page["Keys"]: + self.keys.append( + Key( + key["KeyId"], + key["KeyArn"], + regional_client.region, + ) + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" + ) + + def __describe_key__(self): + logger.info("KMS - Describing Key...") + try: + for key in self.keys: + regional_client = self.regional_clients[key.region] + response = regional_client.describe_key(KeyId=key.id) + key.state = response["KeyMetadata"]["KeyState"] + key.origin = response["KeyMetadata"]["Origin"] + key.manager = response["KeyMetadata"]["KeyManager"] + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" + ) + + def __get_key_rotation_status__(self): + logger.info("KMS - Get Key Rotation Status...") + for key in self.keys: + try: + regional_client = self.regional_clients[key.region] + key.rotation_enabled = regional_client.get_key_rotation_status( + KeyId=key.id + )["KeyRotationEnabled"] + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" + ) + + def __get_key_policy__(self): + logger.info("KMS - Get Key Policy...") + for key in self.keys: + try: + if key.manager == "CUSTOMER": # only customer KMS have policies + regional_client = self.regional_clients[key.region] + key.policy = json.loads( + regional_client.get_key_policy( + KeyId=key.id, PolicyName="default" + )["Policy"] + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" + ) + + +@dataclass +class Key: + id: str + arn: str + state: str + origin: str + manager: str + rotation_enabled: bool + policy: dict + region: str + + def __init__( + self, + id, + arn, + region, + ): + self.id = id + self.arn = arn + self.state = None + self.origin = None + self.manager = None + self.rotation_enabled = False + self.policy = {} + self.region = region diff --git a/providers/aws/services/kms/kms_service_test.py b/providers/aws/services/kms/kms_service_test.py new file mode 100644 index 00000000..cf0deb37 --- /dev/null +++ b/providers/aws/services/kms/kms_service_test.py @@ -0,0 +1,162 @@ +import json + +from boto3 import client, session +from moto import mock_kms + +from providers.aws.lib.audit_info.models import AWS_Audit_Info +from providers.aws.services.kms.kms_service import KMS + +AWS_ACCOUNT_NUMBER = 123456789012 +AWS_REGION = "us-east-1" + + +class Test_ACM_Service: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + ) + return audit_info + + # Test KMS Service + @mock_kms + def test_service(self): + # KMS client for this test class + audit_info = self.set_mocked_audit_info() + kms = KMS(audit_info) + assert kms.service == "kms" + + # Test KMS Client + @mock_kms + def test_client(self): + # KMS client for this test class + audit_info = self.set_mocked_audit_info() + kms = KMS(audit_info) + for client in kms.regional_clients.values(): + assert client.__class__.__name__ == "KMS" + + # Test KMS Session + @mock_kms + def test__get_session__(self): + # KMS client for this test class + audit_info = self.set_mocked_audit_info() + kms = KMS(audit_info) + assert kms.session.__class__.__name__ == "Session" + + # Test KMS Session + @mock_kms + def test_audited_account(self): + # KMS client for this test class + audit_info = self.set_mocked_audit_info() + kms = KMS(audit_info) + assert kms.audited_account == AWS_ACCOUNT_NUMBER + + # Test KMS List Keys + @mock_kms + def test__list_keys__(self): + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Create KMS keys + key1 = kms_client.create_key()["KeyMetadata"] + key2 = kms_client.create_key()["KeyMetadata"] + # KMS client for this test class + audit_info = self.set_mocked_audit_info() + kms = KMS(audit_info) + assert len(kms.keys) == 2 + assert kms.keys[0].arn == key1["Arn"] + assert kms.keys[1].arn == key2["Arn"] + + # Test KMS Describe Keys + @mock_kms + def test__describe_key__(self): + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Create KMS keys + key1 = kms_client.create_key()["KeyMetadata"] + # KMS client for this test class + audit_info = self.set_mocked_audit_info() + kms = KMS(audit_info) + assert len(kms.keys) == 1 + assert kms.keys[0].arn == key1["Arn"] + assert kms.keys[0].state == key1["KeyState"] + assert kms.keys[0].origin == key1["Origin"] + assert kms.keys[0].manager == key1["KeyManager"] + + # Test KMS Get rotation status + @mock_kms + def test__get_key_rotation_status__(self): + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Create KMS keys + key1 = kms_client.create_key()["KeyMetadata"] + key2 = kms_client.create_key()["KeyMetadata"] + kms_client.enable_key_rotation(KeyId=key2["KeyId"]) + # KMS client for this test class + audit_info = self.set_mocked_audit_info() + kms = KMS(audit_info) + assert len(kms.keys) == 2 + assert kms.keys[0].arn == key1["Arn"] + assert kms.keys[0].rotation_enabled == False + assert kms.keys[1].arn == key2["Arn"] + assert kms.keys[1].rotation_enabled == True + + # Test KMS Key policy + @mock_kms + def test__get_key_policy__(self): + public_policy = json.dumps( + { + "Version": "2012-10-17", + "Id": "key-default-1", + "Statement": [ + { + "Sid": "Enable IAM User Permissions", + "Effect": "Allow", + "Principal": "*", + "Action": "kms:*", + "Resource": "*", + } + ], + } + ) + default_policy = json.dumps( + { + "Version": "2012-10-17", + "Id": "key-default-1", + "Statement": [ + { + "Sid": "Enable IAM User Permissions", + "Effect": "Allow", + "Principal": {"AWS": "arn:aws:iam::123456789012:root"}, + "Action": "kms:*", + "Resource": "*", + } + ], + } + ) + # Generate KMS Client + kms_client = client("kms", region_name=AWS_REGION) + # Create KMS keys + key1 = kms_client.create_key(Policy=default_policy)["KeyMetadata"] + key2 = kms_client.create_key(Policy=public_policy)["KeyMetadata"] + # KMS client for this test class + audit_info = self.set_mocked_audit_info() + kms = KMS(audit_info) + assert len(kms.keys) == 2 + assert kms.keys[0].arn == key1["Arn"] + assert kms.keys[0].policy == json.loads(default_policy) + assert kms.keys[1].arn == key2["Arn"] + assert kms.keys[1].policy == json.loads(public_policy)