feat(cloudformation): Service and Checks (#1454)

Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
This commit is contained in:
Pepe Fagoaga
2022-11-07 16:17:38 +01:00
committed by GitHub
parent f5873fe0d7
commit 69d3a9e363
16 changed files with 880 additions and 171 deletions

View File

@@ -1,60 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# Remediation:
#
# https://docs.aws.amazon.com/cli/latest/reference/cloudformation/update-termination-protection.html
#
# aws cloudformation update-termination-protection \
# --stack-name my-stack \
# --enable-termination-protection
CHECK_ID_extra7154="7.154"
CHECK_TITLE_extra7154="[extra7154] Enable termination protection for Cloudformation Stacks"
CHECK_SCORED_extra7154="NOT_SCORED"
CHECK_CIS_LEVEL_extra7154="EXTRA"
CHECK_SEVERITY_extra7154="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7154="AwsCloudFormationStack"
CHECK_ALTERNATE_check7154="extra7154"
CHECK_SERVICENAME_extra7154="cloudformation"
CHECK_RISK_extra7154='Without termination protection enabled; a critical cloudformation stack can be accidently deleted.'
CHECK_REMEDIATION_extra7154='Ensure termination protection is enabled for the cloudformation stacks'
CHECK_DOC_extra7154='https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-protect-stacks.html'
CHECK_CAF_EPIC_extra7154='Infrastructure Protection'
extra7154() {
for regx in $REGIONS; do
CFN_STACKS=$($AWSCLI cloudformation describe-stacks $PROFILE_OPT --region $regx --output json 2>&1)
if [[ $(echo "$CFN_STACKS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to describe stacks" "$regx"
continue
fi
LIST_OF_CFN_STACKS=$(echo $CFN_STACKS | jq -r '.Stacks[].StackName')
if [[ $LIST_OF_CFN_STACKS ]];then
for stack in $LIST_OF_CFN_STACKS; do
CFN_STACK_DETAILS=$($AWSCLI cloudformation describe-stacks $PROFILE_OPT --region $regx --stack-name $stack --output json)
TERMINATION_ENABLED=$(echo $CFN_STACK_DETAILS | jq -r '.Stacks[].EnableTerminationProtection')
ROOT_ID=$(echo $CFN_STACK_DETAILS | jq -r '.Stacks[].RootId')
if [[ $ROOT_ID != null && $TERMINATION_ENABLED == "false" ]]; then
textInfo "$regx: $stack is a nested stack. Enable termination protection on the root stack $ROOT_ID" "$regx" "$stack" "$ROOT_ID"
elif [[ $TERMINATION_ENABLED == "true" ]]; then
textPass "$regx: Cloudformation stack $stack has termination protection enabled" "$regx" "$stack"
else
textFail "$regx: Cloudformation stack $stack has termination protection disabled" "$regx" "$stack"
fi
done
else
textInfo "$regx: No Cloudformation stacks found" "$regx"
fi
done
}

View File

@@ -1,75 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra742="7.42"
CHECK_TITLE_extra742="[extra742] Find secrets in CloudFormation outputs"
CHECK_SCORED_extra742="NOT_SCORED"
CHECK_CIS_LEVEL_extra742="EXTRA"
CHECK_SEVERITY_extra742="Critical"
CHECK_ASFF_RESOURCE_TYPE_extra742="AwsCloudFormationStack"
CHECK_ALTERNATE_check742="extra742"
CHECK_SERVICENAME_extra742="cloudformation"
CHECK_RISK_extra742='Secrets hardcoded into CloudFormation outputs can be used by malware and bad actors to gain lateral access to other services.'
CHECK_REMEDIATION_extra742='Implement automated detective control (e.g. using tools like Prowler ) to scan accounts for passwords and secrets. Use secrets manager service to store and retrieve passwords and secrets. '
CHECK_DOC_extra742='https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-secretsmanager-secret-generatesecretstring.html'
CHECK_CAF_EPIC_extra742='IAM'
extra742(){
SECRETS_TEMP_FOLDER="$PROWLER_DIR/secrets-$ACCOUNT_NUM"
if [[ ! -d $SECRETS_TEMP_FOLDER ]]; then
# this folder is deleted once this check is finished
mkdir "${SECRETS_TEMP_FOLDER}"
fi
for regx in $REGIONS; do
CHECK_DETECT_SECRETS_INSTALLATION=$(secretsDetector)
if [[ $? -eq 241 ]]; then
textInfo "$regx: python library detect-secrets not found. Make sure it is installed correctly." "$regx"
else
CFN_STACKS=$("${AWSCLI}" cloudformation describe-stacks $PROFILE_OPT --region "${regx}" --output json 2>&1)
if grep -q -E 'AccessDenied|UnauthorizedOperation|AuthorizationError' <<< "$CFN_STACKS" ; then
textInfo "$regx: Access Denied trying to describe stacks" "$regx"
continue
fi
LIST_OF_CFN_STACKS=$(jq -r '.Stacks[].StackName' <<< "${CFN_STACKS}")
if [[ $LIST_OF_CFN_STACKS ]];then
for stackName in $LIST_OF_CFN_STACKS; do
CFN_OUTPUTS_FILE="$SECRETS_TEMP_FOLDER/extra742-${stackName}-${regx}-outputs.txt"
# OutputKey and OutputValue are separated by a colon because secrets-detector needs a way to link both values
jq --arg stackName "$stackName" -r '.Stacks[] | select( .StackName == $stackName ) | .Outputs[]? | "\(.OutputKey):\(.OutputValue)"' <<< "${CFN_STACKS}" > "${CFN_OUTPUTS_FILE}"
if [ -s "${CFN_OUTPUTS_FILE}" ];then
FINDINGS=$(secretsDetector file "${CFN_OUTPUTS_FILE}")
if [[ $FINDINGS -eq 0 ]]; then
textPass "$regx: No secrets found in stack ${stackName} Outputs" "$regx" "${stackName}"
# Delete file if nothing interesting is there
rm -f "${CFN_OUTPUTS_FILE}"
else
textFail "$regx: Potential secret found in stack ${stackName} Outputs" "$regx" "${stackName}"
# Delete file to not leave trace, user must look at the CFN Stack
rm -f "${CFN_OUTPUTS_FILE}"
fi
else
textInfo "$regx: CloudFormation stack ${stackName} has no Outputs" "$regx"
fi
done
else
textInfo "$regx: No CloudFormation stacks found" "$regx"
fi
fi
done
# Cleanup temporary folder
if [[ -d $SECRETS_TEMP_FOLDER ]]
then
rm -rf "${SECRETS_TEMP_FOLDER}"
fi
}

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.cloudformation.cloudformation_service import CloudFormation
cloudformation_client = CloudFormation(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "cloudformation_outputs_find_secrets",
"CheckTitle": "Find secrets in CloudFormation outputs",
"CheckType": [],
"ServiceName": "cloudformation",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:cloudformation:region:account-id:stack/resource-id",
"Severity": "critical",
"ResourceType": "AwsCloudFormationStack",
"Description": "Find secrets in CloudFormation outputs",
"Risk": "Secrets hardcoded into CloudFormation outputs can be used by malware and bad actors to gain lateral access to other services.",
"RelatedUrl": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-secretsmanager-secret-generatesecretstring.html",
"Remediation": {
"Code": {
"CLI": "https://docs.bridgecrew.io/docs/bc_aws_secrets_2#cli-command",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Implement automated detective control to scan accounts for passwords and secrets. Use secrets manager service to store and retrieve passwords and secrets.",
"Url": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-secretsmanager-secret-generatesecretstring.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Infrastructure Protection",
"Compliance": []
}

View File

@@ -0,0 +1,57 @@
import os
import tempfile
from detect_secrets import SecretsCollection
from detect_secrets.settings import default_settings
from lib.check.models import Check, Check_Report
from providers.aws.services.cloudformation.cloudformation_client import (
cloudformation_client,
)
class cloudformation_outputs_find_secrets(Check):
"""Check if a CloudFormation Stack has secrets in their Outputs"""
def execute(self):
"""Execute the cloudformation_outputs_find_secrets check"""
findings = []
for stack in cloudformation_client.stacks:
report = Check_Report(self.metadata)
report.region = stack.region
report.resource_id = stack.name
report.resource_arn = stack.arn
if stack.outputs:
temp_output_file = tempfile.NamedTemporaryFile(delete=False)
# Store the CloudFormation Stack Outputs into a file
for output in stack.outputs:
temp_output_file.write(f"{output}".encode())
temp_output_file.close()
# Init detect_secrets
secrets = SecretsCollection()
# Scan file for secrets
with default_settings():
secrets.scan_file(temp_output_file.name)
if secrets.json():
report.status = "FAIL"
report.status_extended = (
f"Potential secret found in Stack {stack.name} Outputs."
)
else:
report.status = "PASS"
report.status_extended = (
f"No secrets found in Stack {stack.name} Outputs."
)
os.remove(temp_output_file.name)
else:
report.status = "PASS"
report.status_extended = f"CloudFormation {stack.name} has no Outputs."
findings.append(report)
return findings

View File

@@ -0,0 +1,133 @@
from unittest import mock
from providers.aws.services.cloudformation.cloudformation_service import Stack
# Mock Test Region
AWS_REGION = "eu-west-1"
class Test_cloudformation_outputs_find_secrets:
def test_no_stacks(self):
cloudformation_client = mock.MagicMock
cloudformation_client.stacks = []
with mock.patch(
"providers.aws.services.cloudformation.cloudformation_service.CloudFormation",
new=cloudformation_client,
):
# Test Check
from providers.aws.services.cloudformation.cloudformation_outputs_find_secrets.cloudformation_outputs_find_secrets import (
cloudformation_outputs_find_secrets,
)
check = cloudformation_outputs_find_secrets()
result = check.execute()
assert len(result) == 0
def test_stack_secret_in_outputs(self):
cloudformation_client = mock.MagicMock
stack_name = "Test-Stack"
cloudformation_client.stacks = [
Stack(
arn="arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60",
name=stack_name,
outputs=["DB_PASSWORD:foobar123", "ENV:DEV"],
region=AWS_REGION,
)
]
with mock.patch(
"providers.aws.services.cloudformation.cloudformation_service.CloudFormation",
cloudformation_client,
):
from providers.aws.services.cloudformation.cloudformation_outputs_find_secrets.cloudformation_outputs_find_secrets import (
cloudformation_outputs_find_secrets,
)
check = cloudformation_outputs_find_secrets()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Potential secret found in Stack {stack_name} Outputs."
)
assert result[0].resource_id == "Test-Stack"
assert (
result[0].resource_arn
== "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
)
assert result[0].region == AWS_REGION
def test_stack_no_secret_in_outputs(self):
cloudformation_client = mock.MagicMock
stack_name = "Test-Stack"
cloudformation_client.stacks = [
Stack(
arn="arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60",
name=stack_name,
outputs=["ENV:DEV"],
region=AWS_REGION,
)
]
with mock.patch(
"providers.aws.services.cloudformation.cloudformation_service.CloudFormation",
cloudformation_client,
):
from providers.aws.services.cloudformation.cloudformation_outputs_find_secrets.cloudformation_outputs_find_secrets import (
cloudformation_outputs_find_secrets,
)
check = cloudformation_outputs_find_secrets()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"No secrets found in Stack {stack_name} Outputs."
)
assert result[0].resource_id == "Test-Stack"
assert (
result[0].resource_arn
== "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
)
assert result[0].region == AWS_REGION
def test_stack_no_outputs(self):
cloudformation_client = mock.MagicMock
stack_name = "Test-Stack"
cloudformation_client.stacks = [
Stack(
arn="arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60",
name=stack_name,
outputs=[],
region=AWS_REGION,
)
]
with mock.patch(
"providers.aws.services.cloudformation.cloudformation_service.CloudFormation",
cloudformation_client,
):
from providers.aws.services.cloudformation.cloudformation_outputs_find_secrets.cloudformation_outputs_find_secrets import (
cloudformation_outputs_find_secrets,
)
check = cloudformation_outputs_find_secrets()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFormation {stack_name} has no Outputs."
)
assert result[0].resource_id == "Test-Stack"
assert (
result[0].resource_arn
== "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
)
assert result[0].region == AWS_REGION

View File

@@ -0,0 +1,108 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## CloudFormation
class CloudFormation:
def __init__(self, audit_info):
self.service = "cloudformation"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.stacks = []
self.__threading_call__(self.__describe_stacks__)
self.__describe_stack__()
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_stacks__(self, regional_client):
"""Get ALL CloudFormation Stacks"""
logger.info("CloudFormation - Describing Stacks...")
try:
describe_stacks_paginator = regional_client.get_paginator("describe_stacks")
for page in describe_stacks_paginator.paginate():
for stack in page["Stacks"]:
outputs = []
for output in stack["Outputs"]:
outputs.append(f"{output['OutputKey']}:{output['OutputValue']}")
self.stacks.append(
Stack(
arn=stack["StackId"],
name=stack["StackName"],
outputs=outputs,
region=regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_stack__(self):
"""Get Details for a CloudFormation Stack"""
logger.info("CloudFormation - Describing Stack to get specific details...")
try:
for stack in self.stacks:
stack_details = self.regional_clients[stack.region].describe_stacks(
StackName=stack.name
)
# Termination Protection
stack.enable_termination_protection = stack_details["Stacks"][0][
"EnableTerminationProtection"
]
# Nested Stack
if "RootId" in stack_details["Stacks"][0]:
stack.root_nested_stack = stack_details["Stacks"][0]["RootId"]
stack.is_nested_stack = True if stack.root_nested_stack != "" else False
except Exception as error:
logger.error(
f"{stack.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class Stack:
"""Stack holds a CloudFormation Stack"""
arn: str
"""In the CloudFormation API the "Stacks[].StackId" is the ARN"""
name: str
"""Stacks[].StackName"""
outputs: list[str]
"""Stacks[].Outputs"""
enable_termination_protection: bool
"""Stacks[].EnableTerminationProtection"""
root_nested_stack: str
"""Stacks[].RootId"""
is_nested_stack: str
"""True if the Stack is a Nested Stack"""
region: str
def __init__(
self,
arn,
name,
outputs,
region,
):
self.arn = arn
self.name = name
self.outputs = outputs
self.enable_termination_protection = False
self.is_nested_stack = False
self.root_nested_stack = ""
self.region = region

View File

@@ -0,0 +1,213 @@
import datetime
import json
from unittest.mock import patch
import boto3
import botocore
from boto3 import session
from dateutil.tz import tzutc
from moto import mock_cloudformation
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
from providers.aws.services.cloudformation.cloudformation_service import CloudFormation
# Mock Test Region
AWS_REGION = "eu-west-1"
# Dummy CloudFormation Template
dummy_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Stack 1",
"Resources": {
"EC2Instance1": {
"Type": "AWS::EC2::Instance",
"Properties": {
"ImageId": "EXAMPLE_AMI_ID",
"KeyName": "dummy",
"InstanceType": "t2.micro",
"Tags": [
{"Key": "Description", "Value": "Test tag"},
{"Key": "Name", "Value": "Name tag for tests"},
],
},
}
},
}
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
# As you can see the operation_name has the list_analyzers snake_case form but
# we are using the ListAnalyzers form.
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "CreateStack":
return {
"StackId": "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
}
if operation_name == "DescribeStacks":
print(f"ARGS: {kwarg}")
if "StackName" in kwarg:
return {
"Stacks": [
{
"StackId": "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60",
"StackName": "Test-Stack",
"Description": "Stack 1",
"Parameters": [],
"CreationTime": datetime.datetime(
2022, 11, 7, 9, 33, 51, tzinfo=tzutc()
),
"StackStatus": "CREATE_COMPLETE",
"DisableRollback": False,
"NotificationARNs": [],
"Outputs": [
{
"OutputKey": "TestOutput1",
"OutputValue": "TestValue1",
"Description": "Test Output Description.",
}
],
"RoleARN": "arn:aws:iam::123456789012:role/moto",
"EnableTerminationProtection": True,
"Tags": [
{"Key": "Tag1", "Value": "Value1"},
{"Key": "Tag2", "Value": "Value2"},
],
}
]
}
# Return all Stacks
else:
return {
"Stacks": [
{
"StackId": "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60",
"StackName": "Test-Stack",
"Description": "Stack 1",
"Parameters": [],
"CreationTime": datetime.datetime(
2022, 11, 7, 9, 33, 51, tzinfo=tzutc()
),
"StackStatus": "CREATE_COMPLETE",
"DisableRollback": False,
"NotificationARNs": [],
"Outputs": [
{
"OutputKey": "TestOutput1",
"OutputValue": "TestValue1",
"Description": "Test Output Description.",
}
],
"RoleARN": "arn:aws:iam::123456789012:role/moto",
"Tags": [
{"Key": "Tag1", "Value": "Value1"},
{"Key": "Tag2", "Value": "Value2"},
],
}
]
}
return make_api_call(self, operation_name, kwarg)
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"providers.aws.services.cloudformation.cloudformation_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_CloudFormation_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=None,
audited_user_id=None,
audited_partition=None,
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test CloudFormation Client
@mock_cloudformation
def test__get_client__(self):
cloudformation = CloudFormation(self.set_mocked_audit_info())
assert (
cloudformation.regional_clients[AWS_REGION].__class__.__name__
== "CloudFormation"
)
# Test CloudFormation Service
@mock_cloudformation
def test__get_service__(self):
cloudformation = CloudFormation(self.set_mocked_audit_info())
assert (
cloudformation.regional_clients[AWS_REGION].__class__.__name__
== "CloudFormation"
)
# Test CloudFormation Session
@mock_cloudformation
def test__get_session__(self):
cloudformation = CloudFormation(self.set_mocked_audit_info())
assert cloudformation.session.__class__.__name__ == "Session"
@mock_cloudformation
def test__describe_stacks__(self):
cloudformation_client = boto3.client("cloudformation", region_name=AWS_REGION)
stack_arn = cloudformation_client.create_stack(
StackName="Test-Stack",
TemplateBody=json.dumps(dummy_template),
RoleARN=f"arn:aws:iam::{DEFAULT_ACCOUNT_ID}:role/moto",
Tags=[
{"Key": "Tag1", "Value": "Value1"},
{"Key": "Tag2", "Value": "Value2"},
],
EnableTerminationProtection=True,
Outputs=[
{
"OutputKey": "TestOutput1",
"OutputValue": "TestValue1",
"Description": "Test Output Description.",
}
],
)
cloudformation = CloudFormation(self.set_mocked_audit_info())
assert len(cloudformation.stacks) == 1
assert cloudformation.stacks[0].arn == stack_arn["StackId"]
assert cloudformation.stacks[0].name == "Test-Stack"
assert cloudformation.stacks[0].outputs == ["TestOutput1:TestValue1"]
assert cloudformation.stacks[0].enable_termination_protection == True
assert cloudformation.stacks[0].is_nested_stack == False
assert cloudformation.stacks[0].root_nested_stack == ""
assert cloudformation.stacks[0].region == AWS_REGION

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "cloudformation_stacks_termination_protection_enabled",
"CheckTitle": "Enable termination protection for Cloudformation Stacks",
"CheckType": [],
"ServiceName": "cloudformation",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:cloudformation:region:account-id:stack/resource-id",
"Severity": "medium",
"ResourceType": "AwsCloudFormationStack",
"Description": "Enable termination protection for Cloudformation Stacks",
"Risk": "Without termination protection enabled; a critical cloudformation stack can be accidently deleted.",
"RelatedUrl": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-protect-stacks.html",
"Remediation": {
"Code": {
"CLI": "aws cloudformation update-termination-protection --region us-east-1 --stack-name <STACK_NAME> --enable-termination-protection",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure termination protection is enabled for the cloudformation stacks.",
"Url": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-protect-stacks.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Infrastructure Protection",
"Compliance": []
}

View File

@@ -0,0 +1,28 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.cloudformation.cloudformation_client import (
cloudformation_client,
)
class cloudformation_stacks_termination_protection_enabled(Check):
"""Check if a CloudFormation Stack has the Termination Protection enabled"""
def execute(self):
"""Execute the cloudformation_stacks_termination_protection_enabled check"""
findings = []
for stack in cloudformation_client.stacks:
if not stack.is_nested_stack:
report = Check_Report(self.metadata)
report.region = stack.region
report.resource_id = stack.name
report.resource_arn = stack.arn
if stack.enable_termination_protection:
report.status = "PASS"
report.status_extended = f"CloudFormation {stack.name} has termination protection enabled"
else:
report.status = "FAIL"
report.status_extended = f"CloudFormation {stack.name} has termination protection disabled"
findings.append(report)
return findings

View File

@@ -0,0 +1,99 @@
from unittest import mock
from providers.aws.services.cloudformation.cloudformation_service import Stack
# Mock Test Region
AWS_REGION = "eu-west-1"
class Test_cloudformation_stacks_termination_protection_enabled:
def test_no_stacks(self):
cloudformation_client = mock.MagicMock
cloudformation_client.stacks = []
with mock.patch(
"providers.aws.services.cloudformation.cloudformation_service.CloudFormation",
new=cloudformation_client,
):
# Test Check
from providers.aws.services.cloudformation.cloudformation_stacks_termination_protection_enabled.cloudformation_stacks_termination_protection_enabled import (
cloudformation_stacks_termination_protection_enabled,
)
check = cloudformation_stacks_termination_protection_enabled()
result = check.execute()
assert len(result) == 0
def test_stack_termination_protection_enabled(self):
cloudformation_client = mock.MagicMock
stack_name = "Test-Stack"
cloudformation_client.stacks = [
Stack(
arn="arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60",
name=stack_name,
outputs="",
region=AWS_REGION,
)
]
cloudformation_client.stacks[0].enable_termination_protection = True
with mock.patch(
"providers.aws.services.cloudformation.cloudformation_service.CloudFormation",
cloudformation_client,
):
from providers.aws.services.cloudformation.cloudformation_stacks_termination_protection_enabled.cloudformation_stacks_termination_protection_enabled import (
cloudformation_stacks_termination_protection_enabled,
)
check = cloudformation_stacks_termination_protection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFormation {stack_name} has termination protection enabled"
)
assert result[0].resource_id == "Test-Stack"
assert (
result[0].resource_arn
== "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
)
assert result[0].region == AWS_REGION
def test_stack_termination_protection_disabled(self):
cloudformation_client = mock.MagicMock
stack_name = "Test-Stack"
cloudformation_client.stacks = [
Stack(
arn="arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60",
name=stack_name,
outputs="",
region=AWS_REGION,
)
]
cloudformation_client.stacks[0].enable_termination_protection = False
with mock.patch(
"providers.aws.services.cloudformation.cloudformation_service.CloudFormation",
cloudformation_client,
):
from providers.aws.services.cloudformation.cloudformation_stacks_termination_protection_enabled.cloudformation_stacks_termination_protection_enabled import (
cloudformation_stacks_termination_protection_enabled,
)
check = cloudformation_stacks_termination_protection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFormation {stack_name} has termination protection disabled"
)
assert result[0].resource_id == "Test-Stack"
assert (
result[0].resource_arn
== "arn:aws:cloudformation:eu-west-1:123456789012:stack/Test-Stack/796c8d26-b390-41d7-a23c-0702c4e78b60"
)
assert result[0].region == AWS_REGION