feat(secretsmanager): Service and check (#1483)

This commit is contained in:
Pepe Fagoaga
2022-11-16 10:23:05 +01:00
committed by GitHub
parent 2c5320a0b0
commit 684b7fe0b8
9 changed files with 372 additions and 61 deletions

View File

@@ -1,61 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
# Remediation:
#
# https://docs.aws.amazon.com/cli/latest/reference/secretsmanager/rotate-secret.html
#
# rotate-secret
# --secret-id <value>
# [--client-request-token <value>]
# [--rotation-lambda-arn <value>]
# [--rotation-rules <value>]
# [--cli-input-json <value>]
# [--generate-cli-skeleton <value>]
CHECK_ID_extra7163="7.163"
CHECK_TITLE_extra7163="[extra7163] Check if Secrets Manager key rotation is enabled"
CHECK_SCORED_extra7163="NOT_SCORED"
CHECK_CIS_LEVEL_extra7163="EXTRA"
CHECK_SEVERITY_extra7163="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra7163="AwsSecretsManagerSecret"
CHECK_ALTERNATE_extra7163="extra7163"
CHECK_SERVICENAME_extra7163="secretsmanager"
CHECK_RISK_extra7163="Rotating secrets minimizes exposure to attacks using stolen keys."
CHECK_REMEDIATION_extra7163="Enable key rotation on Secrets Manager key."
CHECK_DOC_extra7163="https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets_strategies.html"
CHECK_CAF_EPIC_extra7163="Data Protection"
extra7163(){
# "Check if Secrets Manager key rotation is enabled"
for regx in $REGIONS; do
LIST_OF_SECRETS=$($AWSCLI secretsmanager list-secrets $PROFILE_OPT --region $regx --query 'SecretList[*].Name' --output text 2>&1)
if [[ $(echo "$LIST_OF_SECRETS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to list secrets" "$regx"
continue
fi
if [[ $LIST_OF_SECRETS ]]; then
for secret in $LIST_OF_SECRETS; do
KEY_ROTATION_ENABLED=$($AWSCLI secretsmanager describe-secret $PROFILE_OPT --region $regx --secret-id $secret --output json | jq '.RotationEnabled')
if [[ $KEY_ROTATION_ENABLED == true ]]; then
textPass "$regx: $secret has key rotation enabled." "$regx" "$secret"
else
textFail "$regx: $secret does not have key rotation enabled." "$regx" "$secret"
fi
done
else
textPass "$regx: No Secrets Manager secrets found." "$regx"
fi
done
}

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "secretsmanager_automatic_rotation_enabled",
"CheckTitle": "Check if Secrets Manager key rotation is enabled.",
"CheckType": [],
"ServiceName": "secretsmanager",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:secretsmanager:region:account-id:secret:secret-name",
"Severity": "medium",
"ResourceType": "AwsSecretsManagerSecret",
"Description": "Check if Secrets Manager key rotation is enabled.",
"Risk": "Rotating secrets minimizes exposure to attacks using stolen keys.",
"RelatedUrl": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets_strategies.html",
"Remediation": {
"Code": {
"CLI": "aws secretsmanager rotate-secret --region <REGION> --secret-id <SECRET-ID> --rotation-lambda-arn <LAMBDA-ARN> --rotation-rules AutomaticallyAfterDays=30",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Implement automated detective control to scan accounts for passwords and secrets. Use secrets manager service to store and retrieve passwords and secrets.",
"Url": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets_strategies.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "Infrastructure Protection",
"Compliance": []
}

View File

@@ -0,0 +1,29 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.secretsmanager.secretsmanager_client import (
secretsmanager_client,
)
class secretsmanager_automatic_rotation_enabled(Check):
def execute(self):
findings = []
for secret in secretsmanager_client.secrets.values():
report = Check_Report(self.metadata)
report.region = secret.region
report.resource_id = secret.name
report.resource_arn = secret.arn
if secret.rotation_enabled:
report.status = "PASS"
report.status_extended = (
f"SecretsManager secret {secret.name} has rotation enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"SecretsManager secret {secret.name} has rotation disabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,95 @@
from unittest import mock
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.services.secretsmanager.secretsmanager_service import Secret
# Mock Test Region
AWS_REGION = "eu-west-1"
class Test_secretsmanager_automatic_rotation_enabled:
def test_no_secrets(self):
secretsmanager_client = mock.MagicMock
secretsmanager_client.secrets = {}
with mock.patch(
"providers.aws.services.secretsmanager.secretsmanager_service.SecretsManager",
new=secretsmanager_client,
):
# Test Check
from providers.aws.services.secretsmanager.secretsmanager_automatic_rotation_enabled.secretsmanager_automatic_rotation_enabled import (
secretsmanager_automatic_rotation_enabled,
)
check = secretsmanager_automatic_rotation_enabled()
result = check.execute()
assert len(result) == 0
def test_secret_rotation_disabled(self):
secretsmanager_client = mock.MagicMock
secret_name = "test-secret"
secret_arn = f"arn:aws:secretsmanager:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:secret:{secret_name}"
secretsmanager_client.secrets = {
secret_name: Secret(
arn=secret_arn,
region=AWS_REGION,
name=secret_name,
rotation_enabled=False,
)
}
with mock.patch(
"providers.aws.services.secretsmanager.secretsmanager_service.SecretsManager",
new=secretsmanager_client,
):
# Test Check
from providers.aws.services.secretsmanager.secretsmanager_automatic_rotation_enabled.secretsmanager_automatic_rotation_enabled import (
secretsmanager_automatic_rotation_enabled,
)
check = secretsmanager_automatic_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == secret_name
assert result[0].resource_arn == secret_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"SecretsManager secret {secret_name} has rotation disabled."
)
def test_secret_rotation_enabled(self):
secretsmanager_client = mock.MagicMock
secret_name = "test-secret"
secret_arn = f"arn:aws:secretsmanager:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:secret:{secret_name}"
secretsmanager_client.secrets = {
secret_name: Secret(
arn=secret_arn,
region=AWS_REGION,
name=secret_name,
rotation_enabled=True,
)
}
with mock.patch(
"providers.aws.services.secretsmanager.secretsmanager_service.SecretsManager",
new=secretsmanager_client,
):
# Test Check
from providers.aws.services.secretsmanager.secretsmanager_automatic_rotation_enabled.secretsmanager_automatic_rotation_enabled import (
secretsmanager_automatic_rotation_enabled,
)
check = secretsmanager_automatic_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION
assert result[0].resource_id == secret_name
assert result[0].resource_arn == secret_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SecretsManager secret {secret_name} has rotation enabled."
)

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.secretsmanager.secretsmanager_service import SecretsManager
secretsmanager_client = SecretsManager(current_audit_info)

View File

@@ -0,0 +1,56 @@
import threading
from pydantic import BaseModel
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## SecretsManager
class SecretsManager:
def __init__(self, audit_info):
self.service = "secretsmanager"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.secrets = {}
self.__threading_call__(self.__list_secrets__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __list_secrets__(self, regional_client):
logger.info("SecretsManager - Listing Secrets...")
try:
list_secrets_paginator = regional_client.get_paginator("list_secrets")
for page in list_secrets_paginator.paginate():
for secret in page["SecretList"]:
self.secrets[secret["Name"]] = Secret(
arn=secret["ARN"],
name=secret["Name"],
region=regional_client.region,
rotation_enabled=secret["RotationEnabled"],
)
except Exception as error:
logger.error(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
class Secret(BaseModel):
arn: str
name: str
region: str
rotation_enabled: bool

View File

@@ -0,0 +1,153 @@
import io
import zipfile
from unittest.mock import patch
from boto3 import client, resource, session
from moto import mock_ec2, mock_iam, mock_lambda, mock_s3, mock_secretsmanager
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.secretsmanager.secretsmanager_service import SecretsManager
# Mock Test Region
AWS_REGION = "eu-west-1"
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch(
"providers.aws.services.secretsmanager.secretsmanager_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_SecretsManager_Service:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=DEFAULT_ACCOUNT_ID,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test SecretsManager Client
@mock_secretsmanager
def test__get_client__(self):
audit_info = self.set_mocked_audit_info()
secretsmanager = SecretsManager(audit_info)
assert (
secretsmanager.regional_clients[AWS_REGION].__class__.__name__
== "SecretsManager"
)
# Test SecretsManager Session
@mock_secretsmanager
def test__get_session__(self):
audit_info = self.set_mocked_audit_info()
secretsmanager = SecretsManager(audit_info)
assert secretsmanager.session.__class__.__name__ == "Session"
# Test SecretsManager Service
@mock_secretsmanager
def test__get_service__(self):
audit_info = self.set_mocked_audit_info()
secretsmanager = SecretsManager(audit_info)
assert secretsmanager.service == "secretsmanager"
@mock_secretsmanager
@mock_lambda
@mock_ec2
@mock_iam
@mock_s3
def test__list_secrets__(self):
secretsmanager_client = client("secretsmanager", region_name=AWS_REGION)
# Create Secret
resp = secretsmanager_client.create_secret(
Name="test-secret", SecretString="test-secret"
)
secret_arn = resp["ARN"]
secret_name = resp["Name"]
# Create IAM Lambda Role
iam_client = client("iam", region_name=AWS_REGION)
iam_role = iam_client.create_role(
RoleName="rotation-lambda-role",
AssumeRolePolicyDocument="test-policy",
Path="/",
)["Role"]["Arn"]
# Create S3 Bucket
s3_client = resource("s3", region_name=AWS_REGION)
s3_client.create_bucket(
Bucket="test-bucket",
CreateBucketConfiguration={"LocationConstraint": AWS_REGION},
)
# Create Lambda Code
zip_output = io.BytesIO()
zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED)
zip_file.writestr(
"lambda_function.py",
"""
def lambda_handler(event, context):
print("custom log event")
return event
""",
)
zip_file.close()
zip_output.seek(0)
# Create Rotation Lambda
lambda_client = client("lambda", region_name=AWS_REGION)
resp = lambda_client.create_function(
FunctionName="rotation-lambda",
Runtime="python3.7",
Role=iam_role,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": zip_output.read()},
Description="test lambda function",
Timeout=3,
MemorySize=128,
PackageType="ZIP",
Publish=True,
VpcConfig={
"SecurityGroupIds": ["sg-123abc"],
"SubnetIds": ["subnet-123abc"],
},
)
lambda_arn = resp["FunctionArn"]
# Enable Rotation
secretsmanager_client.rotate_secret(
SecretId=secret_arn,
RotationLambdaARN=lambda_arn,
RotationRules={
"AutomaticallyAfterDays": 90,
"Duration": "3h",
"ScheduleExpression": "rate(10 days)",
},
RotateImmediately=True,
)
# Set partition for the service
audit_info = self.set_mocked_audit_info()
secretsmanager = SecretsManager(audit_info)
assert len(secretsmanager.secrets) == 1
assert secretsmanager.secrets
assert secretsmanager.secrets[secret_name]
assert secretsmanager.secrets[secret_name].name == secret_name
assert secretsmanager.secrets[secret_name].arn == secret_arn
assert secretsmanager.secrets[secret_name].region == AWS_REGION
assert secretsmanager.secrets[secret_name].rotation_enabled is True