feat(kms): add service, checks and tests (#1439)

This commit is contained in:
Sergio Garcia
2022-10-28 12:30:34 +02:00
committed by GitHub
parent b2976984d3
commit f7842fdcdd
20 changed files with 864 additions and 189 deletions

View File

View File

@@ -1,80 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_check28="2.8"
CHECK_TITLE_check28="[check28] Ensure rotation for customer created KMS CMKs is enabled"
CHECK_SCORED_check28="SCORED"
CHECK_CIS_LEVEL_check28="LEVEL2"
CHECK_SEVERITY_check28="Medium"
CHECK_ASFF_TYPE_check28="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"
CHECK_ASFF_RESOURCE_TYPE_check28="AwsKmsKey"
CHECK_ALTERNATE_check208="check28"
CHECK_SERVICENAME_check28="kms"
CHECK_RISK_check28='Cryptographic best practices discourage extensive reuse of encryption keys. Consequently; Customer Master Keys (CMKs) should be rotated to prevent usage of compromised keys.'
CHECK_REMEDIATION_check28='For every KMS Customer Master Keys (CMKs); ensure that Rotate this key every year is enabled.'
CHECK_DOC_check28='https://docs.aws.amazon.com/kms/latest/developerguide/rotate-keys.html'
CHECK_CAF_EPIC_check28='Data Protection'
check28(){
# "Ensure rotation for customer created CMKs is enabled (Scored)"
for regx in $REGIONS; do
CHECK_KMS_KEYLIST=$($AWSCLI kms list-keys $PROFILE_OPT --region $regx --output text --query 'Keys[*].KeyId' --output text 2>&1)
if [[ $(echo "$CHECK_KMS_KEYLIST" | grep AccessDenied) ]]; then
textInfo "$regx: Access Denied trying to list keys" "$regx" "$key"
continue
fi
if [[ $CHECK_KMS_KEYLIST ]]; then
cmk_count=0
for key in $CHECK_KMS_KEYLIST; do
KMSDETAILS=$($AWSCLI kms describe-key --key-id $key $PROFILE_OPT --region $regx --query 'KeyMetadata.{key:KeyId,man:KeyManager,origin:Origin,spec:CustomerMasterKeySpec,state:KeyState}' --output text 2>&1 | grep SYMMETRIC)
if [[ $(echo "$KMSDETAILS" | grep AccessDenied) ]]; then
textInfo "$regx: Access Denied describing key $key" "$regx" "$key"
continue
fi
KEYID=$(echo $KMSDETAILS | awk '{print $1}')
KEYMANAGER=$(echo $KMSDETAILS | awk '{print $2}')
KEYORIGIN=$(echo $KMSDETAILS | awk '{print $3}')
KEYSTATE=$(echo $KMSDETAILS | awk '{print $5}')
if [[ "$KEYMANAGER" == "AWS" ]]; then
continue
fi
if [[ "$KEYSTATE" != "Enabled" ]]; then
continue
fi
cmk_count=$((cmk_count + 1))
if [[ "$KEYORIGIN" == "EXTERNAL" ]]; then
textPass "$regx: Key $key uses imported key material" "$regx" "$key"
else
CHECK_KMS_KEY_ROTATION=$($AWSCLI kms get-key-rotation-status --key-id $key $PROFILE_OPT --region $regx --output text 2>&1)
if [[ $(echo "$CHECK_KMS_KEY_ROTATION" | grep AccessDenied) ]]; then
textInfo "$regx: Access Denied getting key rotation status for $key " "$regx" "$key"
continue
fi
if [[ "$CHECK_KMS_KEY_ROTATION" == "True" ]];then
textPass "$regx: Key $key automatic rotation of the key material is enabled" "$regx" "$key"
else
textFail "$regx: Key $key automatic rotation of the key material is disabled" "$regx" "$key"
fi
fi
done
if [[ $cmk_count == 0 ]]; then
textInfo "$regx: This region has no customer managed keys" "$regx" "$key"
fi
else
textInfo "$regx: This region has no KMS keys" "$regx" "$key"
fi
done
}

View File

@@ -1,45 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra7126="7.126"
CHECK_TITLE_extra7126="[extra7126] Check if there are CMK KMS keys not used"
CHECK_SCORED_extra7126="NOT_SCORED"
CHECK_CIS_LEVEL_extra7126="EXTRA"
CHECK_SEVERITY_extra7126="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7126="AwsKmsKey"
CHECK_ALTERNATE_check7126="extra7126"
CHECK_ASFF_COMPLIANCE_TYPE_extra7126="op.exp.11.aws.kms.2"
CHECK_SERVICENAME_extra7126="kms"
CHECK_RISK_extra7126='Unused keys may increase service cost.'
CHECK_REMEDIATION_extra7126='Before deleting a customer master key (CMK); you might want to know how many cipher-texts were encrypted under that key. '
CHECK_DOC_extra7126='https://docs.aws.amazon.com/kms/latest/developerguide/deleting-keys-determining-usage.html'
CHECK_CAF_EPIC_extra7126='Data Protection'
extra7126(){
for regx in $REGIONS; do
LIST_OF_CUSTOMER_KMS_KEYS=$($AWSCLI kms list-aliases $PROFILE_OPT --region $regx --query "Aliases[].[AliasName,TargetKeyId]" --output text |grep -v ^alias/aws/ |awk '{ print $2 }')
if [[ $LIST_OF_CUSTOMER_KMS_KEYS ]];then
for key in $LIST_OF_CUSTOMER_KMS_KEYS; do
CHECK_STATUS=$($AWSCLI kms describe-key --key-id $key $PROFILE_OPT --region $regx --output json | jq -r '.KeyMetadata.KeyState')
if [[ $CHECK_STATUS == "PendingDeletion" ]]; then
textInfo "$regx: KMS key $key is pending deletion" "$regx"
elif [[ $CHECK_STATUS == "Disabled" ]]; then
textInfo "$regx: KMS key $key is disabled" "$regx" "$key"
else
textPass "$regx: KMS key $key is not disabled or pending deletion" "$regx" "$key"
fi
done
else
textInfo "$regx: No KMS keys found" "$regx"
fi
done
}

View File

@@ -1,64 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra736="7.36"
CHECK_TITLE_extra736="[extra736] Check exposed KMS keys"
CHECK_SCORED_extra736="NOT_SCORED"
CHECK_CIS_LEVEL_extra736="EXTRA"
CHECK_SEVERITY_extra736="Critical"
CHECK_ASFF_RESOURCE_TYPE_extra736="AwsKmsKey"
CHECK_ALTERNATE_check736="extra736"
CHECK_ASFF_COMPLIANCE_TYPE_extra736="ens-op.exp.11.aws.kms.2"
CHECK_SERVICENAME_extra736="kms"
CHECK_RISK_extra736='Exposed KMS Keys or wide policy permissions my leave data unprotected.'
CHECK_REMEDIATION_extra736='To determine the full extent of who or what currently has access to a customer master key (CMK) in AWS KMS; you must examine the CMK key policy; all grants that apply to the CMK; and potentially all AWS Identity and Access Management (IAM) policies. You might do this to determine the scope of potential usage of a CMK.'
CHECK_DOC_extra736='https://docs.aws.amazon.com/kms/latest/developerguide/determining-access.html'
CHECK_CAF_EPIC_extra736='Data Protection'
extra736(){
for regx in $REGIONS; do
local CUSTOMER_MANAGED_KMS_KEYS=()
# First, we need to recover every KMS key
LIST_OF_KMS_KEYS=$($AWSCLI kms list-keys $PROFILE_OPT --region "${regx}" --query "Keys[].[KeyArn]" --output text)
if [[ $LIST_OF_KMS_KEYS ]]
then
# Second, we need to check for Customer Managed KMS keys, with or without a configured alias
for keyID in ${LIST_OF_KMS_KEYS}
do
KMS_KEY_MANAGER=$($AWSCLI kms describe-key $PROFILE_OPT --region "${regx}" --key-id "${keyID}" --query "KeyMetadata.KeyManager" --output text)
if [[ "${KMS_KEY_MANAGER}" == "CUSTOMER" ]]
then
CUSTOMER_MANAGED_KMS_KEYS+=( "${keyID}" )
fi
done
else
textInfo "${regx}: No KMS keys found" "${regx}"
continue
fi
# Third, we need to check the policy included in every Customer Managed KMS key
if [[ "${CUSTOMER_MANAGED_KMS_KEYS[*]}" ]]
then
for keyID in "${CUSTOMER_MANAGED_KMS_KEYS[@]}"
do
CHECK_POLICY=$($AWSCLI kms get-key-policy --key-id "${keyID}" --policy-name default $PROFILE_OPT --region "${regx}" --output text | jq '.Statement[]|select(.Effect=="Allow" and (((.Principal|type == "object") and .Principal.AWS == "*") or ((.Principal|type == "string") and .Principal == "*")) and .Condition == null)')
if [[ $CHECK_POLICY ]]; then
textFail "${regx}: KMS key ${keyID} may be publicly accessible!" "${regx}" "${keyID}"
else
textPass "${regx}: KMS key ${keyID} is not exposed to Public" "${regx}" "${keyID}"
fi
done
else
textInfo "${regx}: No Customer Managed KMS keys found" "${regx}"
fi
done
}

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
kms_client = KMS(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "kms_cmk_are_used",
"CheckTitle": "Check if there are CMK KMS keys not used.",
"CheckType": ["Data Protection"],
"ServiceName": "kms",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:kms:region:account-id:certificate/resource-id",
"Severity": "medium",
"ResourceType": "AwsKmsKey",
"Description": "Check if there are CMK KMS keys not used.",
"Risk": "Unused keys may increase service cost.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "aws kms schedule-key-deletion --key-id <key_id> --pending-window-in-days 7",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Before deleting a customer master key (CMK), you might want to know how many cipher-texts were encrypted under that key.",
"Url": "https://docs.aws.amazon.com/kms/latest/developerguide/deleting-keys-determining-usage.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,26 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.kms.kms_client import kms_client
class kms_cmk_are_used(Check):
def execute(self):
findings = []
for key in kms_client.keys:
# Only check CMKs keys
if key.manager == "CUSTOMER":
report = Check_Report(self.metadata)
report.region = key.region
report.resource_id = key.id
report.resource_arn = key.arn
if key.state != "Enabled":
if key.state == "PendingDeletion":
report.status = "PASS"
report.status_extended = f"KMS CMK {key.id} is not being used but it has scheduled deletion."
else:
report.status = "FAIL"
report.status_extended = f"KMS CMK {key.id} is not being used."
else:
report.status = "PASS"
report.status_extended = f"KMS CMK {key.id} is being used."
findings.append(report)
return findings

View File

@@ -0,0 +1,142 @@
from unittest import mock
from boto3 import client
from moto import mock_kms
AWS_REGION = "us-east-1"
class Test_kms_cmk_are_used:
@mock_kms
def test_kms_no_keys(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client",
new=KMS(current_audit_info),
):
# Test Check
from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import (
kms_cmk_are_used,
)
check = kms_cmk_are_used()
result = check.execute()
assert len(result) == 0
@mock_kms
def test_kms_cmk_are_used(self):
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Create enabled KMS key
key = kms_client.create_key()["KeyMetadata"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client",
new=KMS(current_audit_info),
):
# Test Check
from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import (
kms_cmk_are_used,
)
check = kms_cmk_are_used()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == f"KMS CMK {key['KeyId']} is being used."
assert result[0].resource_id == key["KeyId"]
assert result[0].resource_arn == key["Arn"]
@mock_kms
def test_kms_key_with_deletion(self):
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Creaty KMS key with deletion
key = kms_client.create_key()["KeyMetadata"]
kms_client.schedule_key_deletion(KeyId=key["KeyId"])
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client",
new=KMS(current_audit_info),
):
# Test Check
from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import (
kms_cmk_are_used,
)
check = kms_cmk_are_used()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"KMS CMK {key['KeyId']} is not being used but it has scheduled deletion."
)
assert result[0].resource_id == key["KeyId"]
assert result[0].resource_arn == key["Arn"]
@mock_kms
def test_kms_disabled_key(self):
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Creaty KMS key with deletion
key = kms_client.create_key()["KeyMetadata"]
kms_client.disable_key(KeyId=key["KeyId"])
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client",
new=KMS(current_audit_info),
):
# Test Check
from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import (
kms_cmk_are_used,
)
check = kms_cmk_are_used()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"KMS CMK {key['KeyId']} is not being used."
)
assert result[0].resource_id == key["KeyId"]
assert result[0].resource_arn == key["Arn"]
@mock_kms
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used.kms_client",
new=mock_client,
):
# Test Check
from providers.aws.services.kms.kms_cmk_are_used.kms_cmk_are_used import (
kms_cmk_are_used,
)
check = kms_cmk_are_used()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "kms_cmk_rotation_enabled",
"CheckTitle": "Ensure rotation for customer created KMS CMKs is enabled.",
"CheckType": ["Data Protection"],
"ServiceName": "kms",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:kms:region:account-id:certificate/resource-id",
"Severity": "medium",
"ResourceType": "AwsKmsKey",
"Description": "Ensure rotation for customer created KMS CMKs is enabled.",
"Risk": "Cryptographic best practices discourage extensive reuse of encryption keys. Consequently, Customer Master Keys (CMKs) should be rotated to prevent usage of compromised keys.",
"RelatedUrl": "https://aws.amazon.com/blogs/security/how-to-get-ready-for-certificate-transparency/",
"Remediation": {
"Code": {
"CLI": "aws kms enable-key-rotation --key-id <key_id>",
"NativeIaC": "",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/ensure-kms-have-rotation-policy#terraform"
},
"Recommendation": {
"Text": "For every KMS Customer Master Keys (CMKs), ensure that Rotate this key every year is enabled.",
"Url": "https://docs.aws.amazon.com/kms/latest/developerguide/rotate-keys.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,28 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.kms.kms_client import kms_client
class kms_cmk_rotation_enabled(Check):
def execute(self):
findings = []
for key in kms_client.keys:
report = Check_Report(self.metadata)
report.region = key.region
# Only check enabled CMKs keys
if key.manager == "CUSTOMER" and key.state == "Enabled":
if key.rotation_enabled:
report.status = "PASS"
report.status_extended = (
f"KMS CMK {key.id} has automatic rotation enabled."
)
report.resource_id = key.id
report.resource_arn = key.arn
else:
report.status = "FAIL"
report.status_extended = (
f"KMS CMK {key.id} has automatic rotation disabled."
)
report.resource_id = key.id
report.resource_arn = key.arn
findings.append(report)
return findings

View File

@@ -0,0 +1,112 @@
from unittest import mock
from boto3 import client
from moto import mock_kms
AWS_REGION = "us-east-1"
class Test_kms_cmk_rotation_enabled:
@mock_kms
def test_kms_no_key(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled.kms_client",
new=KMS(current_audit_info),
):
# Test Check
from providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled import (
kms_cmk_rotation_enabled,
)
check = kms_cmk_rotation_enabled()
result = check.execute()
assert len(result) == 0
@mock_kms
def test_kms_cmk_rotation_enabled(self):
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Creaty KMS key with rotation
key = kms_client.create_key()["KeyMetadata"]
kms_client.enable_key_rotation(KeyId=key["KeyId"])
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled.kms_client",
new=KMS(current_audit_info),
):
# Test Check
from providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled import (
kms_cmk_rotation_enabled,
)
check = kms_cmk_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"KMS CMK {key['KeyId']} has automatic rotation enabled."
)
assert result[0].resource_id == key["KeyId"]
assert result[0].resource_arn == key["Arn"]
@mock_kms
def test_kms_cmk_rotation_disabled(self):
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Creaty KMS key without rotation
key = kms_client.create_key()["KeyMetadata"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled.kms_client",
new=KMS(current_audit_info),
):
# Test Check
from providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled import (
kms_cmk_rotation_enabled,
)
check = kms_cmk_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"KMS CMK {key['KeyId']} has automatic rotation disabled."
)
assert result[0].resource_id == key["KeyId"]
assert result[0].resource_arn == key["Arn"]
@mock_kms
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled.kms_client",
new=mock_client,
):
# Test Check
from providers.aws.services.kms.kms_cmk_rotation_enabled.kms_cmk_rotation_enabled import (
kms_cmk_rotation_enabled,
)
check = kms_cmk_rotation_enabled()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "kms_key_not_publicly_accessible",
"CheckTitle": "Check exposed KMS keys",
"CheckType": ["Data Protection"],
"ServiceName": "kms",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:kms:region:account-id:certificate/resource-id",
"Severity": "medium",
"ResourceType": "AwsKmsKey",
"Description": "Check exposed KMS keys",
"Risk": "Exposed KMS Keys or wide policy permissions my leave data unprotected.",
"RelatedUrl": "https://docs.aws.amazon.com/kms/latest/developerguide/determining-access.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://github.com/cloudmatos/matos/tree/master/remediations/aws/kms/exposed-key",
"Terraform": ""
},
"Recommendation": {
"Text": "To determine the full extent of who or what currently has access to a customer master key (CMK) in AWS KMS, you must examine the CMK key policy, all grants that apply to the CMK and potentially all AWS Identity and Access Management (IAM) policies. You might do this to determine the scope of potential usage of a CMK.",
"Url": "https://docs.aws.amazon.com/kms/latest/developerguide/determining-access.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,37 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.kms.kms_client import kms_client
class kms_key_not_publicly_accessible(Check):
def execute(self):
findings = []
for key in kms_client.keys:
if (
key.manager == "CUSTOMER" and key.state == "Enabled"
): # only customer KMS have policies
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = f"KMS key {key.id} is not exposed to Public."
report.resource_id = key.id
report.resource_arn = key.arn
report.region = key.region
# If the "Principal" element value is set to { "AWS": "*" } and the policy statement is not using any Condition clauses to filter the access, the selected AWS KMS master key is publicly accessible.
for statement in key.policy["Statement"]:
if "*" == statement["Principal"] and not "Condition" in statement:
report.status = "FAIL"
report.status_extended = (
f"KMS key {key.id} may be publicly accessible!"
)
else:
if type(statement["Principal"]["AWS"]) == str:
principals = [statement["Principal"]["AWS"]]
else:
principals = statement["Principal"]["AWS"]
for principal_arn in principals:
if principal_arn == "*" and not "Condition" in statement:
report.status = "FAIL"
report.status_extended = (
f"KMS key {key.id} may be publicly accessible!"
)
findings.append(report)
return findings

View File

@@ -0,0 +1,128 @@
import json
from unittest import mock
from boto3 import client
from moto import mock_kms
AWS_REGION = "us-east-1"
class Test_kms_key_not_publicly_accessible:
@mock_kms
def test_no_kms_keys(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible.kms_client",
new=KMS(current_audit_info),
):
# Test Check
from providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible import (
kms_key_not_publicly_accessible,
)
check = kms_key_not_publicly_accessible()
result = check.execute()
assert len(result) == 0
@mock_kms
def test_kms_key_not_publicly_accessible(self):
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Creaty KMS key without policy
key = kms_client.create_key()["KeyMetadata"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible.kms_client",
new=KMS(current_audit_info),
):
# Test Check
from providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible import (
kms_key_not_publicly_accessible,
)
check = kms_key_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"KMS key {key['KeyId']} is not exposed to Public."
)
assert result[0].resource_id == key["KeyId"]
assert result[0].resource_arn == key["Arn"]
@mock_kms
def test_kms_key_public_accessible(self):
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Creaty KMS key with public policy
key = kms_client.create_key(
Policy=json.dumps(
{
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [
{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": "*",
"Action": "kms:*",
"Resource": "*",
}
],
}
)
)["KeyMetadata"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.kms.kms_service import KMS
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible.kms_client",
new=KMS(current_audit_info),
):
# Test Check
from providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible import (
kms_key_not_publicly_accessible,
)
check = kms_key_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"KMS key {key['KeyId']} may be publicly accessible!"
)
assert result[0].resource_id == key["KeyId"]
assert result[0].resource_arn == key["Arn"]
@mock_kms
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible.kms_client",
new=mock_client,
):
# Test Check
from providers.aws.services.kms.kms_key_not_publicly_accessible.kms_key_not_publicly_accessible import (
kms_key_not_publicly_accessible,
)
check = kms_key_not_publicly_accessible()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,120 @@
import json
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## KMS
class KMS:
def __init__(self, audit_info):
self.service = "kms"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.keys = []
self.__threading_call__(self.__list_keys__)
self.__describe_key__()
self.__get_key_rotation_status__()
self.__get_key_policy__()
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __list_keys__(self, regional_client):
logger.info("KMS - Listing Keys...")
try:
list_keys_paginator = regional_client.get_paginator("list_keys")
for page in list_keys_paginator.paginate():
for key in page["Keys"]:
self.keys.append(
Key(
key["KeyId"],
key["KeyArn"],
regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
def __describe_key__(self):
logger.info("KMS - Describing Key...")
try:
for key in self.keys:
regional_client = self.regional_clients[key.region]
response = regional_client.describe_key(KeyId=key.id)
key.state = response["KeyMetadata"]["KeyState"]
key.origin = response["KeyMetadata"]["Origin"]
key.manager = response["KeyMetadata"]["KeyManager"]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
def __get_key_rotation_status__(self):
logger.info("KMS - Get Key Rotation Status...")
for key in self.keys:
try:
regional_client = self.regional_clients[key.region]
key.rotation_enabled = regional_client.get_key_rotation_status(
KeyId=key.id
)["KeyRotationEnabled"]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
def __get_key_policy__(self):
logger.info("KMS - Get Key Policy...")
for key in self.keys:
try:
if key.manager == "CUSTOMER": # only customer KMS have policies
regional_client = self.regional_clients[key.region]
key.policy = json.loads(
regional_client.get_key_policy(
KeyId=key.id, PolicyName="default"
)["Policy"]
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
@dataclass
class Key:
id: str
arn: str
state: str
origin: str
manager: str
rotation_enabled: bool
policy: dict
region: str
def __init__(
self,
id,
arn,
region,
):
self.id = id
self.arn = arn
self.state = None
self.origin = None
self.manager = None
self.rotation_enabled = False
self.policy = {}
self.region = region

View File

@@ -0,0 +1,162 @@
import json
from boto3 import client, session
from moto import mock_kms
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.kms.kms_service import KMS
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
class Test_ACM_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test KMS Service
@mock_kms
def test_service(self):
# KMS client for this test class
audit_info = self.set_mocked_audit_info()
kms = KMS(audit_info)
assert kms.service == "kms"
# Test KMS Client
@mock_kms
def test_client(self):
# KMS client for this test class
audit_info = self.set_mocked_audit_info()
kms = KMS(audit_info)
for client in kms.regional_clients.values():
assert client.__class__.__name__ == "KMS"
# Test KMS Session
@mock_kms
def test__get_session__(self):
# KMS client for this test class
audit_info = self.set_mocked_audit_info()
kms = KMS(audit_info)
assert kms.session.__class__.__name__ == "Session"
# Test KMS Session
@mock_kms
def test_audited_account(self):
# KMS client for this test class
audit_info = self.set_mocked_audit_info()
kms = KMS(audit_info)
assert kms.audited_account == AWS_ACCOUNT_NUMBER
# Test KMS List Keys
@mock_kms
def test__list_keys__(self):
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Create KMS keys
key1 = kms_client.create_key()["KeyMetadata"]
key2 = kms_client.create_key()["KeyMetadata"]
# KMS client for this test class
audit_info = self.set_mocked_audit_info()
kms = KMS(audit_info)
assert len(kms.keys) == 2
assert kms.keys[0].arn == key1["Arn"]
assert kms.keys[1].arn == key2["Arn"]
# Test KMS Describe Keys
@mock_kms
def test__describe_key__(self):
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Create KMS keys
key1 = kms_client.create_key()["KeyMetadata"]
# KMS client for this test class
audit_info = self.set_mocked_audit_info()
kms = KMS(audit_info)
assert len(kms.keys) == 1
assert kms.keys[0].arn == key1["Arn"]
assert kms.keys[0].state == key1["KeyState"]
assert kms.keys[0].origin == key1["Origin"]
assert kms.keys[0].manager == key1["KeyManager"]
# Test KMS Get rotation status
@mock_kms
def test__get_key_rotation_status__(self):
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Create KMS keys
key1 = kms_client.create_key()["KeyMetadata"]
key2 = kms_client.create_key()["KeyMetadata"]
kms_client.enable_key_rotation(KeyId=key2["KeyId"])
# KMS client for this test class
audit_info = self.set_mocked_audit_info()
kms = KMS(audit_info)
assert len(kms.keys) == 2
assert kms.keys[0].arn == key1["Arn"]
assert kms.keys[0].rotation_enabled == False
assert kms.keys[1].arn == key2["Arn"]
assert kms.keys[1].rotation_enabled == True
# Test KMS Key policy
@mock_kms
def test__get_key_policy__(self):
public_policy = json.dumps(
{
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [
{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": "*",
"Action": "kms:*",
"Resource": "*",
}
],
}
)
default_policy = json.dumps(
{
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [
{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::123456789012:root"},
"Action": "kms:*",
"Resource": "*",
}
],
}
)
# Generate KMS Client
kms_client = client("kms", region_name=AWS_REGION)
# Create KMS keys
key1 = kms_client.create_key(Policy=default_policy)["KeyMetadata"]
key2 = kms_client.create_key(Policy=public_policy)["KeyMetadata"]
# KMS client for this test class
audit_info = self.set_mocked_audit_info()
kms = KMS(audit_info)
assert len(kms.keys) == 2
assert kms.keys[0].arn == key1["Arn"]
assert kms.keys[0].policy == json.loads(default_policy)
assert kms.keys[1].arn == key2["Arn"]
assert kms.keys[1].policy == json.loads(public_policy)