From 684b7fe0b8c70c124b327465fcf04adee32d5534 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Wed, 16 Nov 2022 10:23:05 +0100 Subject: [PATCH] feat(secretsmanager): Service and check (#1483) --- .../aws/services/secretsmanager/__init__.py | 0 .../services/secretsmanager/check_extra7163 | 61 ------- .../__init__.py | 0 ...r_automatic_rotation_enabled.metadata.json | 35 ++++ ...cretsmanager_automatic_rotation_enabled.py | 29 ++++ ...manager_automatic_rotation_enabled_test.py | 95 +++++++++++ .../secretsmanager/secretsmanager_client.py | 4 + .../secretsmanager/secretsmanager_service.py | 56 +++++++ .../secretsmanager_service_test.py | 153 ++++++++++++++++++ 9 files changed, 372 insertions(+), 61 deletions(-) create mode 100644 providers/aws/services/secretsmanager/__init__.py delete mode 100644 providers/aws/services/secretsmanager/check_extra7163 create mode 100644 providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/__init__.py create mode 100644 providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled.metadata.json create mode 100644 providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled.py create mode 100644 providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled_test.py create mode 100644 providers/aws/services/secretsmanager/secretsmanager_client.py create mode 100644 providers/aws/services/secretsmanager/secretsmanager_service.py create mode 100644 providers/aws/services/secretsmanager/secretsmanager_service_test.py diff --git a/providers/aws/services/secretsmanager/__init__.py b/providers/aws/services/secretsmanager/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/secretsmanager/check_extra7163 b/providers/aws/services/secretsmanager/check_extra7163 deleted file mode 100644 index 78b6384c..00000000 --- a/providers/aws/services/secretsmanager/check_extra7163 +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env bash - -# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -# Remediation: -# -# https://docs.aws.amazon.com/cli/latest/reference/secretsmanager/rotate-secret.html -# -# rotate-secret -# --secret-id -# [--client-request-token ] -# [--rotation-lambda-arn ] -# [--rotation-rules ] -# [--cli-input-json ] -# [--generate-cli-skeleton ] - - -CHECK_ID_extra7163="7.163" -CHECK_TITLE_extra7163="[extra7163] Check if Secrets Manager key rotation is enabled" -CHECK_SCORED_extra7163="NOT_SCORED" -CHECK_CIS_LEVEL_extra7163="EXTRA" -CHECK_SEVERITY_extra7163="Medium" -CHECK_ASFF_RESOURCE_TYPE_extra7163="AwsSecretsManagerSecret" -CHECK_ALTERNATE_extra7163="extra7163" -CHECK_SERVICENAME_extra7163="secretsmanager" -CHECK_RISK_extra7163="Rotating secrets minimizes exposure to attacks using stolen keys." -CHECK_REMEDIATION_extra7163="Enable key rotation on Secrets Manager key." -CHECK_DOC_extra7163="https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets_strategies.html" -CHECK_CAF_EPIC_extra7163="Data Protection" - -extra7163(){ - # "Check if Secrets Manager key rotation is enabled" - for regx in $REGIONS; do - LIST_OF_SECRETS=$($AWSCLI secretsmanager list-secrets $PROFILE_OPT --region $regx --query 'SecretList[*].Name' --output text 2>&1) - if [[ $(echo "$LIST_OF_SECRETS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then - textInfo "$regx: Access Denied trying to list secrets" "$regx" - continue - fi - if [[ $LIST_OF_SECRETS ]]; then - for secret in $LIST_OF_SECRETS; do - KEY_ROTATION_ENABLED=$($AWSCLI secretsmanager describe-secret $PROFILE_OPT --region $regx --secret-id $secret --output json | jq '.RotationEnabled') - if [[ $KEY_ROTATION_ENABLED == true ]]; then - textPass "$regx: $secret has key rotation enabled." "$regx" "$secret" - else - textFail "$regx: $secret does not have key rotation enabled." "$regx" "$secret" - fi - done - else - textPass "$regx: No Secrets Manager secrets found." "$regx" - fi - done -} diff --git a/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/__init__.py b/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled.metadata.json b/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled.metadata.json new file mode 100644 index 00000000..190b7333 --- /dev/null +++ b/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "secretsmanager_automatic_rotation_enabled", + "CheckTitle": "Check if Secrets Manager key rotation is enabled.", + "CheckType": [], + "ServiceName": "secretsmanager", + "SubServiceName": "", + "ResourceIdTemplate": "arn:aws:secretsmanager:region:account-id:secret:secret-name", + "Severity": "medium", + "ResourceType": "AwsSecretsManagerSecret", + "Description": "Check if Secrets Manager key rotation is enabled.", + "Risk": "Rotating secrets minimizes exposure to attacks using stolen keys.", + "RelatedUrl": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets_strategies.html", + "Remediation": { + "Code": { + "CLI": "aws secretsmanager rotate-secret --region --secret-id --rotation-lambda-arn --rotation-rules AutomaticallyAfterDays=30", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Implement automated detective control to scan accounts for passwords and secrets. Use secrets manager service to store and retrieve passwords and secrets.", + "Url": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets_strategies.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "Infrastructure Protection", + "Compliance": [] +} \ No newline at end of file diff --git a/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled.py b/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled.py new file mode 100644 index 00000000..941321a3 --- /dev/null +++ b/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled.py @@ -0,0 +1,29 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.secretsmanager.secretsmanager_client import ( + secretsmanager_client, +) + + +class secretsmanager_automatic_rotation_enabled(Check): + def execute(self): + findings = [] + for secret in secretsmanager_client.secrets.values(): + report = Check_Report(self.metadata) + report.region = secret.region + report.resource_id = secret.name + report.resource_arn = secret.arn + + if secret.rotation_enabled: + report.status = "PASS" + report.status_extended = ( + f"SecretsManager secret {secret.name} has rotation enabled." + ) + else: + report.status = "FAIL" + report.status_extended = ( + f"SecretsManager secret {secret.name} has rotation disabled." + ) + + findings.append(report) + + return findings diff --git a/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled_test.py b/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled_test.py new file mode 100644 index 00000000..5e91945d --- /dev/null +++ b/providers/aws/services/secretsmanager/secretsmanager_automatic_rotation_enabled/secretsmanager_automatic_rotation_enabled_test.py @@ -0,0 +1,95 @@ +from unittest import mock + +from moto.core import DEFAULT_ACCOUNT_ID + +from providers.aws.services.secretsmanager.secretsmanager_service import Secret + +# Mock Test Region +AWS_REGION = "eu-west-1" + + +class Test_secretsmanager_automatic_rotation_enabled: + def test_no_secrets(self): + secretsmanager_client = mock.MagicMock + secretsmanager_client.secrets = {} + with mock.patch( + "providers.aws.services.secretsmanager.secretsmanager_service.SecretsManager", + new=secretsmanager_client, + ): + # Test Check + from providers.aws.services.secretsmanager.secretsmanager_automatic_rotation_enabled.secretsmanager_automatic_rotation_enabled import ( + secretsmanager_automatic_rotation_enabled, + ) + + check = secretsmanager_automatic_rotation_enabled() + result = check.execute() + + assert len(result) == 0 + + def test_secret_rotation_disabled(self): + secretsmanager_client = mock.MagicMock + secret_name = "test-secret" + secret_arn = f"arn:aws:secretsmanager:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:secret:{secret_name}" + secretsmanager_client.secrets = { + secret_name: Secret( + arn=secret_arn, + region=AWS_REGION, + name=secret_name, + rotation_enabled=False, + ) + } + with mock.patch( + "providers.aws.services.secretsmanager.secretsmanager_service.SecretsManager", + new=secretsmanager_client, + ): + # Test Check + from providers.aws.services.secretsmanager.secretsmanager_automatic_rotation_enabled.secretsmanager_automatic_rotation_enabled import ( + secretsmanager_automatic_rotation_enabled, + ) + + check = secretsmanager_automatic_rotation_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].region == AWS_REGION + assert result[0].resource_id == secret_name + assert result[0].resource_arn == secret_arn + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"SecretsManager secret {secret_name} has rotation disabled." + ) + + def test_secret_rotation_enabled(self): + secretsmanager_client = mock.MagicMock + secret_name = "test-secret" + secret_arn = f"arn:aws:secretsmanager:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:secret:{secret_name}" + secretsmanager_client.secrets = { + secret_name: Secret( + arn=secret_arn, + region=AWS_REGION, + name=secret_name, + rotation_enabled=True, + ) + } + with mock.patch( + "providers.aws.services.secretsmanager.secretsmanager_service.SecretsManager", + new=secretsmanager_client, + ): + # Test Check + from providers.aws.services.secretsmanager.secretsmanager_automatic_rotation_enabled.secretsmanager_automatic_rotation_enabled import ( + secretsmanager_automatic_rotation_enabled, + ) + + check = secretsmanager_automatic_rotation_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].region == AWS_REGION + assert result[0].resource_id == secret_name + assert result[0].resource_arn == secret_arn + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"SecretsManager secret {secret_name} has rotation enabled." + ) diff --git a/providers/aws/services/secretsmanager/secretsmanager_client.py b/providers/aws/services/secretsmanager/secretsmanager_client.py new file mode 100644 index 00000000..76a630d7 --- /dev/null +++ b/providers/aws/services/secretsmanager/secretsmanager_client.py @@ -0,0 +1,4 @@ +from providers.aws.lib.audit_info.audit_info import current_audit_info +from providers.aws.services.secretsmanager.secretsmanager_service import SecretsManager + +secretsmanager_client = SecretsManager(current_audit_info) diff --git a/providers/aws/services/secretsmanager/secretsmanager_service.py b/providers/aws/services/secretsmanager/secretsmanager_service.py new file mode 100644 index 00000000..07851d36 --- /dev/null +++ b/providers/aws/services/secretsmanager/secretsmanager_service.py @@ -0,0 +1,56 @@ +import threading + +from pydantic import BaseModel + +from lib.logger import logger +from providers.aws.aws_provider import generate_regional_clients + + +################## SecretsManager +class SecretsManager: + def __init__(self, audit_info): + self.service = "secretsmanager" + self.session = audit_info.audit_session + self.audited_account = audit_info.audited_account + self.regional_clients = generate_regional_clients(self.service, audit_info) + self.secrets = {} + self.__threading_call__(self.__list_secrets__) + + def __get_session__(self): + return self.session + + def __threading_call__(self, call): + threads = [] + for regional_client in self.regional_clients.values(): + threads.append(threading.Thread(target=call, args=(regional_client,))) + for t in threads: + t.start() + for t in threads: + t.join() + + def __list_secrets__(self, regional_client): + logger.info("SecretsManager - Listing Secrets...") + try: + list_secrets_paginator = regional_client.get_paginator("list_secrets") + for page in list_secrets_paginator.paginate(): + for secret in page["SecretList"]: + self.secrets[secret["Name"]] = Secret( + arn=secret["ARN"], + name=secret["Name"], + region=regional_client.region, + rotation_enabled=secret["RotationEnabled"], + ) + + except Exception as error: + logger.error( + f"{regional_client.region} --" + f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:" + f" {error}" + ) + + +class Secret(BaseModel): + arn: str + name: str + region: str + rotation_enabled: bool diff --git a/providers/aws/services/secretsmanager/secretsmanager_service_test.py b/providers/aws/services/secretsmanager/secretsmanager_service_test.py new file mode 100644 index 00000000..92475fc4 --- /dev/null +++ b/providers/aws/services/secretsmanager/secretsmanager_service_test.py @@ -0,0 +1,153 @@ +import io +import zipfile +from unittest.mock import patch + +from boto3 import client, resource, session +from moto import mock_ec2, mock_iam, mock_lambda, mock_s3, mock_secretsmanager +from moto.core import DEFAULT_ACCOUNT_ID + +from providers.aws.lib.audit_info.models import AWS_Audit_Info +from providers.aws.services.secretsmanager.secretsmanager_service import SecretsManager + +# Mock Test Region +AWS_REGION = "eu-west-1" + + +# Mock generate_regional_clients() +def mock_generate_regional_clients(service, audit_info): + regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) + regional_client.region = AWS_REGION + return {AWS_REGION: regional_client} + + +# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client +@patch( + "providers.aws.services.secretsmanager.secretsmanager_service.generate_regional_clients", + new=mock_generate_regional_clients, +) +class Test_SecretsManager_Service: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=DEFAULT_ACCOUNT_ID, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + ) + return audit_info + + # Test SecretsManager Client + @mock_secretsmanager + def test__get_client__(self): + audit_info = self.set_mocked_audit_info() + secretsmanager = SecretsManager(audit_info) + assert ( + secretsmanager.regional_clients[AWS_REGION].__class__.__name__ + == "SecretsManager" + ) + + # Test SecretsManager Session + @mock_secretsmanager + def test__get_session__(self): + audit_info = self.set_mocked_audit_info() + secretsmanager = SecretsManager(audit_info) + assert secretsmanager.session.__class__.__name__ == "Session" + + # Test SecretsManager Service + @mock_secretsmanager + def test__get_service__(self): + audit_info = self.set_mocked_audit_info() + secretsmanager = SecretsManager(audit_info) + assert secretsmanager.service == "secretsmanager" + + @mock_secretsmanager + @mock_lambda + @mock_ec2 + @mock_iam + @mock_s3 + def test__list_secrets__(self): + secretsmanager_client = client("secretsmanager", region_name=AWS_REGION) + # Create Secret + resp = secretsmanager_client.create_secret( + Name="test-secret", SecretString="test-secret" + ) + secret_arn = resp["ARN"] + secret_name = resp["Name"] + # Create IAM Lambda Role + iam_client = client("iam", region_name=AWS_REGION) + iam_role = iam_client.create_role( + RoleName="rotation-lambda-role", + AssumeRolePolicyDocument="test-policy", + Path="/", + )["Role"]["Arn"] + # Create S3 Bucket + s3_client = resource("s3", region_name=AWS_REGION) + s3_client.create_bucket( + Bucket="test-bucket", + CreateBucketConfiguration={"LocationConstraint": AWS_REGION}, + ) + # Create Lambda Code + zip_output = io.BytesIO() + zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) + zip_file.writestr( + "lambda_function.py", + """ + def lambda_handler(event, context): + print("custom log event") + return event + """, + ) + zip_file.close() + zip_output.seek(0) + # Create Rotation Lambda + lambda_client = client("lambda", region_name=AWS_REGION) + resp = lambda_client.create_function( + FunctionName="rotation-lambda", + Runtime="python3.7", + Role=iam_role, + Handler="lambda_function.lambda_handler", + Code={"ZipFile": zip_output.read()}, + Description="test lambda function", + Timeout=3, + MemorySize=128, + PackageType="ZIP", + Publish=True, + VpcConfig={ + "SecurityGroupIds": ["sg-123abc"], + "SubnetIds": ["subnet-123abc"], + }, + ) + lambda_arn = resp["FunctionArn"] + # Enable Rotation + secretsmanager_client.rotate_secret( + SecretId=secret_arn, + RotationLambdaARN=lambda_arn, + RotationRules={ + "AutomaticallyAfterDays": 90, + "Duration": "3h", + "ScheduleExpression": "rate(10 days)", + }, + RotateImmediately=True, + ) + + # Set partition for the service + audit_info = self.set_mocked_audit_info() + secretsmanager = SecretsManager(audit_info) + + assert len(secretsmanager.secrets) == 1 + assert secretsmanager.secrets + assert secretsmanager.secrets[secret_name] + assert secretsmanager.secrets[secret_name].name == secret_name + assert secretsmanager.secrets[secret_name].arn == secret_arn + assert secretsmanager.secrets[secret_name].region == AWS_REGION + assert secretsmanager.secrets[secret_name].rotation_enabled is True