feat(DynamoDB): add DynamoDB service and checks (#1468)

Co-authored-by: sergargar <sergio@verica.io>
Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2022-11-15 14:21:09 +01:00
committed by GitHub
parent 58bdbadb11
commit 5281d521f4
26 changed files with 882 additions and 171 deletions

View File

@@ -305,6 +305,10 @@ def generate_regional_clients(service: str, audit_info: AWS_Audit_Info) -> dict:
json_regions = data["services"]["cloudwatch"]["regions"][ json_regions = data["services"]["cloudwatch"]["regions"][
audit_info.audited_partition audit_info.audited_partition
] ]
elif service == "dax":
json_regions = data["services"]["dynamodb"]["regions"][
audit_info.audited_partition
]
else: else:
json_regions = data["services"][service]["regions"][ json_regions = data["services"][service]["regions"][
audit_info.audited_partition audit_info.audited_partition

View File

@@ -53,7 +53,6 @@ class ApiGatewayV2:
for api in self.apis: for api in self.apis:
regional_client = self.regional_clients[api.region] regional_client = self.regional_clients[api.region]
authorizers = regional_client.get_authorizers(ApiId=api.id)["Items"] authorizers = regional_client.get_authorizers(ApiId=api.id)["Items"]
print(authorizers)
if authorizers: if authorizers:
api.authorizer = True api.authorizer = True
except Exception as error: except Exception as error:

View File

@@ -53,7 +53,6 @@ def mock_make_api_call(self, operation_name, kwarg):
"StackId": "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60" "StackId": "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
} }
if operation_name == "DescribeStacks": if operation_name == "DescribeStacks":
print(f"ARGS: {kwarg}")
if "StackName" in kwarg: if "StackName" in kwarg:
return { return {
"Stacks": [ "Stacks": [

View File

@@ -1,47 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra7128="7.128"
CHECK_TITLE_extra7128="[extra7128] Check if DynamoDB table has encryption at rest enabled using CMK KMS"
CHECK_SCORED_extra7128="NOT_SCORED"
CHECK_CIS_LEVEL_extra7128="EXTRA"
CHECK_SEVERITY_extra7128="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7128="AwsDynamoDBTable"
CHECK_ALTERNATE_check7128="extra7128"
CHECK_ASFF_COMPLIANCE_TYPE_extra7128="ens-mp.info.3.aws.dyndb.1"
CHECK_SERVICENAME_extra7128="dynamodb"
CHECK_RISK_extra7128='All user data stored in Amazon DynamoDB is fully encrypted at rest. This functionality helps reduce the operational burden and complexity involved in protecting sensitive data.'
CHECK_REMEDIATION_extra7128='Specify an encryption key when you create a new table or switch the encryption keys on an existing table by using the AWS Management Console.'
CHECK_DOC_extra7128='https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EncryptionAtRest.html'
CHECK_CAF_EPIC_extra7128='Data Protection'
extra7128(){
for regx in $REGIONS; do
DDB_TABLES_LIST=$($AWSCLI dynamodb list-tables $PROFILE_OPT --region $regx --output text --query TableNames 2>&1)
if [[ $(echo "$DDB_TABLES_LIST" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to list tables" "$regx"
continue
fi
if [[ $DDB_TABLES_LIST ]]; then
for table in $DDB_TABLES_LIST; do
DDB_TABLE_WITH_KMS=$($AWSCLI dynamodb describe-table --table-name $table $PROFILE_OPT --region $regx --query Table.SSEDescription.SSEType --output text)
if [[ $DDB_TABLE_WITH_KMS == "KMS" ]]; then
textPass "$regx: DynamoDB table $table does have KMS encryption enabled" "$regx" "$table"
else
textInfo "$regx: DynamoDB table $table does have DEFAULT encryption enabled" "$regx" "$table"
fi
done
else
textInfo "$regx: There are no DynamoDB tables" "$regx"
fi
done
}

View File

@@ -1,48 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra7151="7.151"
CHECK_TITLE_extra7151="[extra7151] Check if DynamoDB tables point-in-time recovery (PITR) is enabled"
CHECK_SCORED_extra7151="NOT_SCORED"
CHECK_CIS_LEVEL_extra7151="EXTRA"
CHECK_SEVERITY_extra7151="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7151="AwsDynamoDbTable"
CHECK_ALTERNATE_check7151="extra7151"
CHECK_SERVICENAME_extra7151="dynamodb"
CHECK_RISK_extra7151='If the DynamoDB Table does not have point-in-time recovery enabled; it is vulnerable to accidental write or delete operations.'
CHECK_REMEDIATION_extra7151='Enable point-in-time recovery; this is not enabled by default.'
CHECK_DOC_extra7151='https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/PointInTimeRecovery_Howitworks.html'
CHECK_CAF_EPIC_extra7151='Data Protection'
extra7151(){
# "Check if DynamoDB tables point-in-time recovery (PITR) is enabled"
for regx in $REGIONS; do
LIST_OF_DYNAMODB_TABLES=$($AWSCLI dynamodb list-tables $PROFILE_OPT --region $regx --query 'TableNames[*]' --output text 2>&1)
if [[ $(echo "$LIST_OF_DYNAMODB_TABLES" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to list tables" "$regx"
continue
fi
if [[ $LIST_OF_DYNAMODB_TABLES ]]; then
for dynamodb_table in $LIST_OF_DYNAMODB_TABLES; do
POINT_IN_TIME_RECOVERY_ENABLED=$($AWSCLI dynamodb describe-continuous-backups $PROFILE_OPT --region $regx --table-name $dynamodb_table | jq '.[].PointInTimeRecoveryDescription | select(.PointInTimeRecoveryStatus=="ENABLED") | .PointInTimeRecoveryStatus')
if [[ $POINT_IN_TIME_RECOVERY_ENABLED ]]; then
textPass "$regx: $dynamodb_table has point-in-time recovery enabled." "$regx" "$dynamodb_table"
else
textFail "$regx: $dynamodb_table does not have point-in-time recovery enabled." "$regx" "$dynamodb_table"
fi
done
else
textInfo "$regx: No DynamoDB tables found" "$regx"
fi
done
}

View File

@@ -1,68 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
# Remediation
#
# https://docs.aws.amazon.com/cli/latest/reference/dax/create-cluster.html
#
# create-cluster
# --cluster-name <value>
# --node-type <value>
# --replication-factor <value>
# --iam-role-arn <value>
# --sse-specification Enabled=true
CHECK_ID_extra7165="7.165"
CHECK_TITLE_extra7165="[extra7165] Check if DynamoDB: DAX Clusters are encrypted at rest"
CHECK_SCORED_extra7165="NOT_SCORED"
CHECK_CIS_LEVEL_extra7165="EXTRA"
CHECK_SEVERITY_extra7165="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7165="AwsDaxCluster"
CHECK_ALTERNATE_check7165="extra7165"
CHECK_SERVICENAME_extra7165="dynamodb"
CHECK_RISK_extra7165="Encryption at rest provides an additional layer of data protection by securing your data from unauthorized access to the underlying storage."
CHECK_REMEDIATION_extra7165="Re-create the cluster to enable encryption at rest if it was not enabled at creation."
CHECK_DOC_extra7165="https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DAXEncryptionAtRest.html"
CHECK_CAF_EPIC_extra7165="Data Protection"
extra7165(){
# "Check if DynamoDB: DAX Clusters are encrypted at rest"
for regx in $REGIONS; do
LIST_OF_DAX_CLUSTERS=$($AWSCLI dax describe-clusters $PROFILE_OPT --region $regx --query 'Clusters[*]' --output json 2>&1)
if [[ $(echo "$LIST_OF_DAX_CLUSTERS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to describe clusters" "$regx"
continue
fi
if [[ $(echo "$LIST_OF_DAX_CLUSTERS" | grep -E 'Could not connect') ]]; then
textInfo "$regx: Service not available in this region" "$regx"
continue
fi
if [[ $LIST_OF_DAX_CLUSTERS && $LIST_OF_DAX_CLUSTERS != '[]' ]]; then
LIST_OF_CLUSTERS_WITH_ENCRYPTION=$(echo "${LIST_OF_DAX_CLUSTERS}" | jq '.[] | select(.SSEDescription.Status=="ENABLED") | {ClusterName}' | jq -r '.ClusterName')
LIST_OF_CLUSTERS_WITHOUT_ENCRYPTION=$(echo "${LIST_OF_DAX_CLUSTERS}" | jq '.[] | select(.SSEDescription.Status=="DISABLED") | {ClusterName}' | jq -r '.ClusterName')
if [[ $LIST_OF_CLUSTERS_WITHOUT_ENCRYPTION ]]; then
for cluster in $LIST_OF_CLUSTERS_WITHOUT_ENCRYPTION; do
textFail "$regx: ${cluster} does not have encryption at rest enabled." "$regx" "${cluster}"
done
fi
if [[ $LIST_OF_CLUSTERS_WITH_ENCRYPTION ]]; then
for cluster in $LIST_OF_CLUSTERS_WITH_ENCRYPTION; do
textPass "$regx: ${cluster} has encryption at rest enabled." "$regx" "${cluster}"
done
fi
else
textInfo "$regx: No DynamoDB: DAX Clusters found." "$regx"
fi
done
}

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DAX
dax_client = DAX(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "dynamodb_accelerator_cluster_encryption_enabled",
"CheckTitle": "Check if DynamoDB DAX Clusters are encrypted at rest.",
"CheckType": ["Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"],
"ServiceName": "dynamodb",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:dynamodb:region:account-id:certificate/resource-id",
"Severity": "medium",
"ResourceType": "AwsDaxCluster",
"Description": "Check if DynamoDB DAX Clusters are encrypted at rest.",
"Risk": "Encryption at rest provides an additional layer of data protection by securing your data from unauthorized access to the underlying storage.",
"RelatedUrl": "https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DAXEncryptionAtRest.html",
"Remediation": {
"Code": {
"CLI": "aws dax create-cluster --cluster-name <cluster_name> --node-type <node_type> --replication-factor <nodes_number> --iam-role-arn <role_arn> --sse-specification Enabled=true",
"NativeIaC": "https://docs.bridgecrew.io/docs/bc_aws_general_23#cloudformation",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/bc_aws_general_23#terraform"
},
"Recommendation": {
"Text": "Re-create the cluster to enable encryption at rest if it was not enabled at creation.",
"Url": "https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DAXEncryptionAtRest.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Data Protection",
"Compliance": []
}

View File

@@ -0,0 +1,21 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.dynamodb.dax_client import dax_client
class dynamodb_accelerator_cluster_encryption_enabled(Check):
def execute(self):
findings = []
for cluster in dax_client.clusters:
report = Check_Report(self.metadata)
report.resource_id = cluster.name
report.resource_arn = cluster.arn
report.region = cluster.region
report.status = "FAIL"
report.status_extended = f"DynamoDB cluster {cluster.name} does not have encryption at rest enabled."
if cluster.encryption:
report.status = "PASS"
report.status_extended = (
f"DynamoDB cluster {cluster.name} has encryption at rest enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,101 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_dax
from moto.core import DEFAULT_ACCOUNT_ID
AWS_REGION = "us-east-1"
class Test_dynamodb_accelerator_cluster_encryption_enabled:
@mock_dax
def test_dax_no_clusters(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DAX
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.dynamodb.dynamodb_accelerator_cluster_encryption_enabled.dynamodb_accelerator_cluster_encryption_enabled.dax_client",
new=DAX(current_audit_info),
):
# Test Check
from providers.aws.services.dynamodb.dynamodb_accelerator_cluster_encryption_enabled.dynamodb_accelerator_cluster_encryption_enabled import (
dynamodb_accelerator_cluster_encryption_enabled,
)
check = dynamodb_accelerator_cluster_encryption_enabled()
result = check.execute()
assert len(result) == 0
@mock_dax
def test_dax_cluster_no_encryption(self):
dax_client = client("dax", region_name=AWS_REGION)
iam_role_arn = f"arn:aws:iam::{DEFAULT_ACCOUNT_ID}:role/aws-service-role/dax.amazonaws.com/AWSServiceRoleForDAX"
cluster = dax_client.create_cluster(
ClusterName="daxcluster",
NodeType="dax.t3.small",
ReplicationFactor=3,
IamRoleArn=iam_role_arn,
)["Cluster"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DAX
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.dynamodb.dynamodb_accelerator_cluster_encryption_enabled.dynamodb_accelerator_cluster_encryption_enabled.dax_client",
new=DAX(current_audit_info),
):
# Test Check
from providers.aws.services.dynamodb.dynamodb_accelerator_cluster_encryption_enabled.dynamodb_accelerator_cluster_encryption_enabled import (
dynamodb_accelerator_cluster_encryption_enabled,
)
check = dynamodb_accelerator_cluster_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have encryption at rest enabled",
result[0].status_extended,
)
assert result[0].resource_id == cluster["ClusterName"]
assert result[0].resource_arn == cluster["ClusterArn"]
@mock_dax
def test_dax_cluster_with_encryption(self):
dax_client = client("dax", region_name=AWS_REGION)
iam_role_arn = f"arn:aws:iam::{DEFAULT_ACCOUNT_ID}:role/aws-service-role/dax.amazonaws.com/AWSServiceRoleForDAX"
cluster = dax_client.create_cluster(
ClusterName="daxcluster",
NodeType="dax.t3.small",
ReplicationFactor=3,
IamRoleArn=iam_role_arn,
SSESpecification={"Enabled": True},
)["Cluster"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DAX
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.dynamodb.dynamodb_accelerator_cluster_encryption_enabled.dynamodb_accelerator_cluster_encryption_enabled.dax_client",
new=DAX(current_audit_info),
):
# Test Check
from providers.aws.services.dynamodb.dynamodb_accelerator_cluster_encryption_enabled.dynamodb_accelerator_cluster_encryption_enabled import (
dynamodb_accelerator_cluster_encryption_enabled,
)
check = dynamodb_accelerator_cluster_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search("has encryption at rest enabled", result[0].status_extended)
assert result[0].resource_id == cluster["ClusterName"]
assert result[0].resource_arn == cluster["ClusterArn"]

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DynamoDB
dynamodb_client = DynamoDB(current_audit_info)

View File

@@ -0,0 +1,183 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## DynamoDB
class DynamoDB:
def __init__(self, audit_info):
self.service = "dynamodb"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.tables = []
self.__threading_call__(self.__list_tables__)
self.__describe_table__()
self.__describe_continuous_backups__()
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __list_tables__(self, regional_client):
logger.info("DynamoDB - Listing tables...")
try:
list_tables_paginator = regional_client.get_paginator("list_tables")
for page in list_tables_paginator.paginate():
for table in page["TableNames"]:
self.tables.append(
Table(
arn="",
name=table,
encryption_type=None,
kms_arn=None,
region=regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
)
def __describe_table__(self):
logger.info("DynamoDB - Describing Table...")
try:
for table in self.tables:
regional_client = self.regional_clients[table.region]
properties = regional_client.describe_table(TableName=table.name)[
"Table"
]
table.arn = properties["TableArn"]
if "SSEDescription" in properties:
if "SSEType" in properties["SSEDescription"]:
table.encryption_type = properties["SSEDescription"]["SSEType"]
if table.encryption_type == "KMS":
table.kms_arn = properties["SSEDescription"]["KMSMasterKeyArn"]
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
def __describe_continuous_backups__(self):
logger.info("DynamoDB - Describing Continuous Backups...")
try:
for table in self.tables:
regional_client = self.regional_clients[table.region]
properties = regional_client.describe_continuous_backups(
TableName=table.name
)["ContinuousBackupsDescription"]
if "PointInTimeRecoveryDescription" in properties:
if (
properties["PointInTimeRecoveryDescription"][
"PointInTimeRecoveryStatus"
]
== "ENABLED"
):
table.pitr = True
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
################## DynamoDB DAX
class DAX:
def __init__(self, audit_info):
self.service = "dax"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.clusters = []
self.__threading_call__(self.__describe_clusters__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_clusters__(self, regional_client):
logger.info("DynamoDB DAX - Describing clusters...")
try:
describe_clusters_paginator = regional_client.get_paginator(
"describe_clusters"
)
for page in describe_clusters_paginator.paginate():
for cluster in page["Clusters"]:
encryption = False
if "SSEDescription" in cluster:
if cluster["SSEDescription"]["Status"] == "ENABLED":
encryption = True
self.clusters.append(
Cluster(
cluster["ClusterArn"],
cluster["ClusterName"],
encryption,
regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
)
@dataclass
class Table:
arn: str
name: str
encryption_type: str
kms_arn: str
pitr: bool
region: str
def __init__(
self,
arn,
name,
encryption_type,
kms_arn,
region,
):
self.arn = arn
self.name = name
self.encryption_type = encryption_type
self.kms_arn = kms_arn
self.pitr = False
self.region = region
@dataclass
class Cluster:
arn: str
name: str
encryption: str
region: str
def __init__(
self,
arn,
name,
encryption,
region,
):
self.arn = arn
self.name = name
self.encryption = encryption
self.region = region

View File

@@ -0,0 +1,190 @@
from boto3 import client, session
from moto import mock_dax, mock_dynamodb
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.dynamodb.dynamodb_service import DAX, DynamoDB
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
class Test_DynamoDB_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test Dynamo Service
@mock_dynamodb
def test_service(self):
# Dynamo client for this test class
audit_info = self.set_mocked_audit_info()
dynamodb = DynamoDB(audit_info)
assert dynamodb.service == "dynamodb"
# Test Dynamo Client
@mock_dynamodb
def test_client(self):
# Dynamo client for this test class
audit_info = self.set_mocked_audit_info()
dynamodb = DynamoDB(audit_info)
for regional_client in dynamodb.regional_clients.values():
assert regional_client.__class__.__name__ == "DynamoDB"
# Test Dynamo Session
@mock_dynamodb
def test__get_session__(self):
# Dynamo client for this test class
audit_info = self.set_mocked_audit_info()
dynamodb = DynamoDB(audit_info)
assert dynamodb.session.__class__.__name__ == "Session"
# Test Dynamo Session
@mock_dynamodb
def test_audited_account(self):
# Dynamo client for this test class
audit_info = self.set_mocked_audit_info()
dynamodb = DynamoDB(audit_info)
assert dynamodb.audited_account == AWS_ACCOUNT_NUMBER
# Test DynamoDB List Tables
@mock_dynamodb
def test__list_tables__(self):
# Generate DynamoDB Client
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
# Create DynamoDB Tables
dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
{"AttributeName": "client", "AttributeType": "S"},
{"AttributeName": "app", "AttributeType": "S"},
],
KeySchema=[
{"AttributeName": "client", "KeyType": "HASH"},
{"AttributeName": "app", "KeyType": "RANGE"},
],
BillingMode="PAY_PER_REQUEST",
)
dynamodb_client.create_table(
TableName="test2",
AttributeDefinitions=[
{"AttributeName": "client", "AttributeType": "S"},
{"AttributeName": "app", "AttributeType": "S"},
],
KeySchema=[
{"AttributeName": "client", "KeyType": "HASH"},
{"AttributeName": "app", "KeyType": "RANGE"},
],
BillingMode="PAY_PER_REQUEST",
)
# DynamoDB client for this test class
audit_info = self.set_mocked_audit_info()
dynamo = DynamoDB(audit_info)
assert len(dynamo.tables) == 2
assert dynamo.tables[0].name == "test1"
assert dynamo.tables[1].name == "test2"
assert dynamo.tables[0].region == AWS_REGION
assert dynamo.tables[1].region == AWS_REGION
# Test DynamoDB Describe Table
@mock_dynamodb
def test__describe_table__(self):
# Generate DynamoDB Client
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
# Create DynamoDB Table
table = dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
{"AttributeName": "client", "AttributeType": "S"},
{"AttributeName": "app", "AttributeType": "S"},
],
KeySchema=[
{"AttributeName": "client", "KeyType": "HASH"},
{"AttributeName": "app", "KeyType": "RANGE"},
],
BillingMode="PAY_PER_REQUEST",
)["TableDescription"]
# DynamoDB client for this test class
audit_info = self.set_mocked_audit_info()
dynamo = DynamoDB(audit_info)
assert len(dynamo.tables) == 1
assert dynamo.tables[0].arn == table["TableArn"]
assert dynamo.tables[0].name == "test1"
assert dynamo.tables[0].region == AWS_REGION
# Test DynamoDB Describe Table
@mock_dynamodb
def test__describe_continuous_backups__(self):
# Generate DynamoDB Client
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
# Create DynamoDB Table
table = dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
{"AttributeName": "client", "AttributeType": "S"},
{"AttributeName": "app", "AttributeType": "S"},
],
KeySchema=[
{"AttributeName": "client", "KeyType": "HASH"},
{"AttributeName": "app", "KeyType": "RANGE"},
],
BillingMode="PAY_PER_REQUEST",
)["TableDescription"]
dynamodb_client.update_continuous_backups(
TableName="test1",
PointInTimeRecoverySpecification={"PointInTimeRecoveryEnabled": True},
)
# DynamoDB client for this test class
audit_info = self.set_mocked_audit_info()
dynamo = DynamoDB(audit_info)
assert len(dynamo.tables) == 1
assert dynamo.tables[0].arn == table["TableArn"]
assert dynamo.tables[0].name == "test1"
assert dynamo.tables[0].pitr
assert dynamo.tables[0].region == AWS_REGION
# Test DAX List Tables
@mock_dax
def test__describe_clusters__(self):
# Generate DAX Client
dax_client = client("dax", region_name=AWS_REGION)
# Create DAX Clusters
iam_role_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/aws-service-role/dax.amazonaws.com/AWSServiceRoleForDAX"
dax_client.create_cluster(
ClusterName="daxcluster1",
NodeType="dax.t3.small",
ReplicationFactor=3,
IamRoleArn=iam_role_arn,
SSESpecification={"Enabled": True},
)
dax_client.create_cluster(
ClusterName="daxcluster2",
NodeType="dax.t3.small",
ReplicationFactor=3,
IamRoleArn=iam_role_arn,
SSESpecification={"Enabled": True},
)
# DAX client for this test class
audit_info = self.set_mocked_audit_info()
dax = DAX(audit_info)
assert len(dax.clusters) == 2
assert dax.clusters[0].name == "daxcluster1"
assert dax.clusters[1].name == "daxcluster2"
assert dax.clusters[0].region == AWS_REGION
assert dax.clusters[1].region == AWS_REGION

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "dynamodb_tables_kms_cmk_encryption_enabled",
"CheckTitle": "Check if DynamoDB table has encryption at rest enabled using CMK KMS.",
"CheckType": ["Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"],
"ServiceName": "dynamodb",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:dynamodb:region:account-id:table/resource-id",
"Severity": "medium",
"ResourceType": "AwsDynamoDBTable",
"Description": "Check if DynamoDB table has encryption at rest enabled using CMK KMS.",
"Risk": "All user data stored in Amazon DynamoDB is fully encrypted at rest. This functionality helps reduce the operational burden and complexity involved in protecting sensitive data.",
"RelatedUrl": "https://docs.aws.amazon.com/amazondynamodbdb/latest/developerguide/EncryptionAtRest.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/ensure-that-dynamodb-tables-are-encrypted#terraform"
},
"Recommendation": {
"Text": "Specify an encryption key when you create a new table or switch the encryption keys on an existing table by using the AWS Management Console.",
"Url": "https://docs.aws.amazon.com/amazondynamodbdb/latest/developerguide/EncryptionAtRest.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Data Protection",
"Compliance": []
}

View File

@@ -0,0 +1,21 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.dynamodb.dynamodb_client import dynamodb_client
class dynamodb_tables_kms_cmk_encryption_enabled(Check):
def execute(self):
findings = []
for table in dynamodb_client.tables:
report = Check_Report(self.metadata)
report.resource_id = table.name
report.resource_arn = table.arn
report.region = table.region
report.status = "FAIL"
report.status_extended = (
f"DynamoDB table {table.name} does have DEFAULT encryption enabled."
)
if table.encryption_type == "KMS":
report.status = "PASS"
report.status_extended = f"DynamoDB table {table.name} does have KMS encryption enabled with key {table.kms_arn.split('/')[1]}."
findings.append(report)
return findings

View File

@@ -0,0 +1,107 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_dynamodb
AWS_REGION = "us-east-1"
class Test_dynamodb_tables_kms_cmk_encryption_enabled:
@mock_dynamodb
def test_dynamodb_no_tables(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.dynamodb.dynamodb_tables_kms_cmk_encryption_enabled.dynamodb_tables_kms_cmk_encryption_enabled.dynamodb_client",
new=DynamoDB(current_audit_info),
):
# Test Check
from providers.aws.services.dynamodb.dynamodb_tables_kms_cmk_encryption_enabled.dynamodb_tables_kms_cmk_encryption_enabled import (
dynamodb_tables_kms_cmk_encryption_enabled,
)
check = dynamodb_tables_kms_cmk_encryption_enabled()
result = check.execute()
assert len(result) == 0
@mock_dynamodb
def test_dynamodb_table_kms_encryption(self):
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
table = dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
{"AttributeName": "client", "AttributeType": "S"},
{"AttributeName": "app", "AttributeType": "S"},
],
KeySchema=[
{"AttributeName": "client", "KeyType": "HASH"},
{"AttributeName": "app", "KeyType": "RANGE"},
],
BillingMode="PAY_PER_REQUEST",
SSESpecification={"Enabled": True, "KMSMasterKeyId": "/custom-kms-key"},
)["TableDescription"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.dynamodb.dynamodb_tables_kms_cmk_encryption_enabled.dynamodb_tables_kms_cmk_encryption_enabled.dynamodb_client",
new=DynamoDB(current_audit_info),
):
# Test Check
from providers.aws.services.dynamodb.dynamodb_tables_kms_cmk_encryption_enabled.dynamodb_tables_kms_cmk_encryption_enabled import (
dynamodb_tables_kms_cmk_encryption_enabled,
)
check = dynamodb_tables_kms_cmk_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search("KMS encryption enabled", result[0].status_extended)
assert result[0].resource_id == table["TableName"]
assert result[0].resource_arn == table["TableArn"]
@mock_dynamodb
def test_dynamodb_table_default_encryption(self):
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
table = dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
{"AttributeName": "client", "AttributeType": "S"},
{"AttributeName": "app", "AttributeType": "S"},
],
KeySchema=[
{"AttributeName": "client", "KeyType": "HASH"},
{"AttributeName": "app", "KeyType": "RANGE"},
],
BillingMode="PAY_PER_REQUEST",
)["TableDescription"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.dynamodb.dynamodb_tables_kms_cmk_encryption_enabled.dynamodb_tables_kms_cmk_encryption_enabled.dynamodb_client",
new=DynamoDB(current_audit_info),
):
# Test Check
from providers.aws.services.dynamodb.dynamodb_tables_kms_cmk_encryption_enabled.dynamodb_tables_kms_cmk_encryption_enabled import (
dynamodb_tables_kms_cmk_encryption_enabled,
)
check = dynamodb_tables_kms_cmk_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("DEFAULT encryption enabled", result[0].status_extended)
assert result[0].resource_id == table["TableName"]
assert result[0].resource_arn == table["TableArn"]

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "dynamodb_tables_pitr_enabled",
"CheckTitle": "Check if DynamoDB tables point-in-time recovery (PITR) is enabled.",
"CheckType": ["Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"],
"ServiceName": "dynamodb",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:dynamodb:region:account-id:certificate/resource-id",
"Severity": "medium",
"ResourceType": "AwsDynamoDBTable",
"Description": "Check if DynamoDB tables point-in-time recovery (PITR) is enabled.",
"Risk": "If the DynamoDB Table does not have point-in-time recovery enabled, it is vulnerable to accidental write or delete operations.",
"RelatedUrl": "https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/PointInTimeRecovery_Howitworks.html",
"Remediation": {
"Code": {
"CLI": "aws dynamodb update-continuous-backups --table-name <table_name> --point-in-time-recovery-specification PointInTimeRecoveryEnabled=true",
"NativeIaC": "https://docs.bridgecrew.io/docs/general_6#cloudformation--serverless",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/general_6#terraform"
},
"Recommendation": {
"Text": "Enable point-in-time recovery, this is not enabled by default.",
"Url": "https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/PointInTimeRecovery_Howitworks.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Data Protection",
"Compliance": []
}

View File

@@ -0,0 +1,21 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.dynamodb.dynamodb_client import dynamodb_client
class dynamodb_tables_pitr_enabled(Check):
def execute(self):
findings = []
for table in dynamodb_client.tables:
report = Check_Report(self.metadata)
report.resource_id = table.name
report.resource_arn = table.arn
report.region = table.region
report.status = "FAIL"
report.status_extended = f"DynamoDB table {table.name} does not have point-in-time recovery enabled."
if table.pitr:
report.status = "PASS"
report.status_extended = (
f"DynamoDB table {table.name} has point-in-time recovery enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,115 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_dynamodb
AWS_REGION = "us-east-1"
class Test_dynamodb_tables_pitr_enabled:
@mock_dynamodb
def test_dynamodb_no_tables(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.dynamodb.dynamodb_tables_pitr_enabled.dynamodb_tables_pitr_enabled.dynamodb_client",
new=DynamoDB(current_audit_info),
):
# Test Check
from providers.aws.services.dynamodb.dynamodb_tables_pitr_enabled.dynamodb_tables_pitr_enabled import (
dynamodb_tables_pitr_enabled,
)
check = dynamodb_tables_pitr_enabled()
result = check.execute()
assert len(result) == 0
@mock_dynamodb
def test_dynamodb_table_no_pitr(self):
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
table = dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
{"AttributeName": "client", "AttributeType": "S"},
{"AttributeName": "app", "AttributeType": "S"},
],
KeySchema=[
{"AttributeName": "client", "KeyType": "HASH"},
{"AttributeName": "app", "KeyType": "RANGE"},
],
BillingMode="PAY_PER_REQUEST",
)["TableDescription"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.dynamodb.dynamodb_tables_pitr_enabled.dynamodb_tables_pitr_enabled.dynamodb_client",
new=DynamoDB(current_audit_info),
):
# Test Check
from providers.aws.services.dynamodb.dynamodb_tables_pitr_enabled.dynamodb_tables_pitr_enabled import (
dynamodb_tables_pitr_enabled,
)
check = dynamodb_tables_pitr_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have point-in-time recovery enabled",
result[0].status_extended,
)
assert result[0].resource_id == table["TableName"]
assert result[0].resource_arn == table["TableArn"]
@mock_dynamodb
def test_dynamodb_table_with_pitr(self):
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
table = dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
{"AttributeName": "client", "AttributeType": "S"},
{"AttributeName": "app", "AttributeType": "S"},
],
KeySchema=[
{"AttributeName": "client", "KeyType": "HASH"},
{"AttributeName": "app", "KeyType": "RANGE"},
],
BillingMode="PAY_PER_REQUEST",
)["TableDescription"]
dynamodb_client.update_continuous_backups(
TableName="test1",
PointInTimeRecoverySpecification={"PointInTimeRecoveryEnabled": True},
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.dynamodb.dynamodb_tables_pitr_enabled.dynamodb_tables_pitr_enabled.dynamodb_client",
new=DynamoDB(current_audit_info),
):
# Test Check
from providers.aws.services.dynamodb.dynamodb_tables_pitr_enabled.dynamodb_tables_pitr_enabled import (
dynamodb_tables_pitr_enabled,
)
check = dynamodb_tables_pitr_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has point-in-time recovery enabled", result[0].status_extended
)
assert result[0].resource_id == table["TableName"]
assert result[0].resource_arn == table["TableArn"]

View File

@@ -84,7 +84,7 @@ class Test_iam_disable_30_days_credentials_test:
) )
service_client.users[0].password_last_used = "" service_client.users[0].password_last_used = ""
print(service_client.users)
# raise Exception # raise Exception
check = iam_disable_30_days_credentials() check = iam_disable_30_days_credentials()
result = check.execute() result = check.execute()

View File

@@ -84,7 +84,7 @@ class Test_iam_disable_90_days_credentials_test:
) )
service_client.users[0].password_last_used = "" service_client.users[0].password_last_used = ""
print(service_client.users)
# raise Exception # raise Exception
check = iam_disable_90_days_credentials() check = iam_disable_90_days_credentials()
result = check.execute() result = check.execute()

View File

@@ -31,7 +31,7 @@ class Test_iam_no_root_access_key_test:
service_client.credential_report[0]["access_key_2_active"] = "false" service_client.credential_report[0]["access_key_2_active"] = "false"
check = iam_no_root_access_key() check = iam_no_root_access_key()
result = check.execute() result = check.execute()
print(service_client.credential_report)
# raise Exception # raise Exception
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert search( assert search(
@@ -69,7 +69,7 @@ class Test_iam_no_root_access_key_test:
service_client.credential_report[0]["access_key_2_active"] = "false" service_client.credential_report[0]["access_key_2_active"] = "false"
check = iam_no_root_access_key() check = iam_no_root_access_key()
result = check.execute() result = check.execute()
print(service_client.credential_report)
# raise Exception # raise Exception
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert search(
@@ -107,7 +107,7 @@ class Test_iam_no_root_access_key_test:
service_client.credential_report[0]["access_key_2_active"] = "true" service_client.credential_report[0]["access_key_2_active"] = "true"
check = iam_no_root_access_key() check = iam_no_root_access_key()
result = check.execute() result = check.execute()
print(service_client.credential_report)
# raise Exception # raise Exception
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert search(
@@ -145,7 +145,7 @@ class Test_iam_no_root_access_key_test:
service_client.credential_report[0]["access_key_2_active"] = "true" service_client.credential_report[0]["access_key_2_active"] = "true"
check = iam_no_root_access_key() check = iam_no_root_access_key()
result = check.execute() result = check.execute()
print(service_client.credential_report)
# raise Exception # raise Exception
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert search(