diff --git a/providers/aws/services/sns/__init__.py b/providers/aws/services/sns/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/sns/check_extra7130 b/providers/aws/services/sns/check_extra7130 deleted file mode 100644 index 20baecac..00000000 --- a/providers/aws/services/sns/check_extra7130 +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bash - -# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -CHECK_ID_extra7130="7.130" -CHECK_TITLE_extra7130="[extra7130] Ensure there are no SNS Topics unencrypted" -CHECK_SCORED_extra7130="NOT_SCORED" -CHECK_CIS_LEVEL_extra7130="EXTRA" -CHECK_SEVERITY_extra7130="Medium" -CHECK_ASFF_RESOURCE_TYPE_extra7130="AwsSnsTopic" -CHECK_ALTERNATE_check7130="extra7130" -CHECK_SERVICENAME_extra7130="sns" -CHECK_RISK_extra7130='If not enabled sensitive information at rest is not protected.' -CHECK_REMEDIATION_extra7130='Use Amazon SNS with AWS KMS.' -CHECK_DOC_extra7130='https://docs.aws.amazon.com/sns/latest/dg/sns-server-side-encryption.html' -CHECK_CAF_EPIC_extra7130='Data Protection' - -extra7130(){ - for regx in $REGIONS; do - LIST_SNS=$($AWSCLI sns list-topics $PROFILE_OPT --region $regx --query 'Topics[*].TopicArn' --output text 2>&1) - if [[ $(echo "$LIST_SNS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then - textInfo "$regx: Access Denied trying to list topics" "$regx" - continue - fi - if [[ $LIST_SNS ]];then - for topic in $LIST_SNS; do - SHORT_TOPIC=$(echo $topic | awk -F ":" '{print $NF}') - SNS_ENCRYPTION=$($AWSCLI sns get-topic-attributes $PROFILE_OPT --region $regx --topic-arn $topic --query 'Attributes.KmsMasterKeyId' --output text) - if [[ "None" == $SNS_ENCRYPTION ]]; then - textFail "$regx: $SHORT_TOPIC is not encrypted!" "$regx" "$SHORT_TOPIC" - else - textPass "$regx: $SHORT_TOPIC is encrypted" "$regx" "$SHORT_TOPIC" - fi - done - else - textInfo "$regx: No SNS topic found" "$regx" - fi - done -} diff --git a/providers/aws/services/sns/check_extra731 b/providers/aws/services/sns/check_extra731 deleted file mode 100644 index 93a40966..00000000 --- a/providers/aws/services/sns/check_extra731 +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env bash - -# Prowler - the handy cloud security tool (copyright 2018) by Toni de la Fuente -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -CHECK_ID_extra731="7.31" -CHECK_TITLE_extra731="[extra731] Check if SNS topics have policy set as Public" -CHECK_SCORED_extra731="NOT_SCORED" -CHECK_CIS_LEVEL_extra731="EXTRA" -CHECK_SEVERITY_extra731="Critical" -CHECK_ASFF_RESOURCE_TYPE_extra731="AwsSnsTopic" -CHECK_ALTERNATE_check731="extra731" -CHECK_SERVICENAME_extra731="sns" -CHECK_RISK_extra731='Publicly accessible services could expose sensitive data to bad actors.' -CHECK_REMEDIATION_extra731='Ensure there is a business requirement for service to be public.' -CHECK_DOC_extra731='https://docs.aws.amazon.com/config/latest/developerguide/sns-topic-policy.html' -CHECK_CAF_EPIC_extra731='Infrastructure Security' - -extra731(){ - for regx in $REGIONS; do - LIST_SNS=$($AWSCLI sns list-topics $PROFILE_OPT --region $regx --query Topics --output text 2>&1|grep -v ^None ) - if [[ $(echo "$LIST_SNS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then - textInfo "$regx: Access Denied trying to list topics" "$regx" - continue - fi - if [[ $LIST_SNS ]]; then - for topic in $LIST_SNS; do - SHORT_TOPIC=$(echo $topic| cut -d: -f6) - SNS_POLICY=$($AWSCLI sns get-topic-attributes --topic-arn $topic $PROFILE_OPT --region $regx --query Attributes.Policy 2>/dev/null) - SNS_POLICY_ALLOW_ALL=$(echo $SNS_POLICY \ - | jq '. | fromjson' | jq '.Statement[] | select(.Effect=="Allow") | select(.Principal=="*" or .Principal.AWS=="*" or .Principal.CanonicalUser=="*")') - if [[ $SNS_POLICY_ALLOW_ALL ]]; then - SNS_POLICY_ALLOW_ALL_WITHOUT_CONDITION=$(echo $SNS_POLICY \ - | jq '. | fromjson' | jq '.Statement[] | select(.Effect=="Allow") | select(.Principal=="*" or .Principal.AWS=="*" or .Principal.CanonicalUser=="*") | select(has("Condition") | not)') - if [[ $SNS_POLICY_ALLOW_ALL_WITHOUT_CONDITION ]]; then - SNS_POLICY_ALLOW_ALL_WITHOUT_CONDITION_DETAILS=$(echo $SNS_POLICY_ALLOW_ALL_WITHOUT_CONDITION \ - | jq '"[Principal: " + (.Principal|tostring) + " Action: " + (.Action|tostring) + "]"' ) - textFail "$regx: SNS topic $SHORT_TOPIC's policy with public access" "$regx" "$SHORT_TOPIC" - else - textPass "$regx: SNS topic $SHORT_TOPIC's policy with public access but has a Condition" "$regx" "$SHORT_TOPIC" - fi - else - textPass "$regx: SNS topic without public access" "$regx" - fi - done - else - textInfo "$regx: No SNS topic found" "$regx" - fi - done -} diff --git a/providers/aws/services/sns/sns_client.py b/providers/aws/services/sns/sns_client.py new file mode 100644 index 00000000..d024a6e9 --- /dev/null +++ b/providers/aws/services/sns/sns_client.py @@ -0,0 +1,4 @@ +from providers.aws.lib.audit_info.audit_info import current_audit_info +from providers.aws.services.sns.sns_service import SNS + +sns_client = SNS(current_audit_info) diff --git a/providers/aws/services/sns/sns_service.py b/providers/aws/services/sns/sns_service.py new file mode 100644 index 00000000..5e89b54d --- /dev/null +++ b/providers/aws/services/sns/sns_service.py @@ -0,0 +1,75 @@ +import threading +from json import loads + +from pydantic import BaseModel + +from lib.logger import logger +from providers.aws.aws_provider import generate_regional_clients + + +################################ SNS +class SNS: + def __init__(self, audit_info): + self.service = "sns" + self.session = audit_info.audit_session + self.regional_clients = generate_regional_clients(self.service, audit_info) + self.topics = [] + self.__threading_call__(self.__list_topics__) + self.__get_topic_attributes__(self.regional_clients) + + def __get_session__(self): + return self.session + + def __threading_call__(self, call): + threads = [] + for regional_client in self.regional_clients.values(): + threads.append(threading.Thread(target=call, args=(regional_client,))) + for t in threads: + t.start() + for t in threads: + t.join() + + def __list_topics__(self, regional_client): + logger.info("SNS - listing topics...") + try: + list_topics_paginator = regional_client.get_paginator("list_topics") + for page in list_topics_paginator.paginate(): + for topic_arn in page["Topics"]: + self.topics.append( + Topic( + name=topic_arn["TopicArn"].rsplit(":", 1)[1], + arn=topic_arn["TopicArn"], + region=regional_client.region, + ) + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __get_topic_attributes__(self, regional_clients): + logger.info("SNS - getting topic attributes...") + try: + for topic in self.topics: + regional_client = regional_clients[topic.region] + topic_attributes = regional_client.get_topic_attributes( + TopicArn=topic.arn + ) + if "Policy" in topic_attributes["Attributes"]: + topic.policy = loads(topic_attributes["Attributes"]["Policy"]) + if "KmsMasterKeyId" in topic_attributes["Attributes"]: + topic.kms_master_key_id = topic_attributes["Attributes"][ + "KmsMasterKeyId" + ] + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +class Topic(BaseModel): + name: str + arn: str + region: str + policy: dict = None + kms_master_key_id: str = None diff --git a/providers/aws/services/sns/sns_service_test.py b/providers/aws/services/sns/sns_service_test.py new file mode 100644 index 00000000..8f024edc --- /dev/null +++ b/providers/aws/services/sns/sns_service_test.py @@ -0,0 +1,124 @@ +from json import dumps +from unittest.mock import patch +from uuid import uuid4 + +import botocore +from boto3 import client, session +from moto import mock_sns + +from providers.aws.lib.audit_info.models import AWS_Audit_Info +from providers.aws.services.sns.sns_service import SNS + +AWS_ACCOUNT_NUMBER = 123456789012 +AWS_REGION = "eu-west-1" + +topic_name = "test-topic" +test_policy = { + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": f"{AWS_ACCOUNT_NUMBER}"}, + "Action": ["sns:Publish"], + "Resource": f"arn:aws:sns:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{topic_name}", + } + ] +} +kms_key_id = str(uuid4()) + +make_api_call = botocore.client.BaseClient._make_api_call + + +def mock_make_api_call(self, operation_name, kwarg): + if operation_name == "GetTopicAttributes": + return { + "Attributes": {"Policy": dumps(test_policy), "KmsMasterKeyId": kms_key_id} + } + return make_api_call(self, operation_name, kwarg) + + +def mock_generate_regional_clients(service, audit_info): + regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) + regional_client.region = AWS_REGION + return {AWS_REGION: regional_client} + + +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +@patch( + "providers.aws.services.sns.sns_service.generate_regional_clients", + new=mock_generate_regional_clients, +) +class Test_SNS_Service: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + ) + return audit_info + + # Test SNS Service + def test_service(self): + audit_info = self.set_mocked_audit_info() + sns = SNS(audit_info) + assert sns.service == "sns" + + # Test SNS client + def test_client(self): + audit_info = self.set_mocked_audit_info() + sns = SNS(audit_info) + for reg_client in sns.regional_clients.values(): + assert reg_client.__class__.__name__ == "SNS" + + # Test SNS session + def test__get_session__(self): + audit_info = self.set_mocked_audit_info() + sns = SNS(audit_info) + assert sns.session.__class__.__name__ == "Session" + + @mock_sns + # Test SNS session + def test__list_topics__(self): + sns_client = client("sns", region_name=AWS_REGION) + sns_client.create_topic(Name=topic_name) + + audit_info = self.set_mocked_audit_info() + sns = SNS(audit_info) + + assert len(sns.topics) == 1 + assert sns.topics[0].name == topic_name + assert ( + sns.topics[0].arn + == f"arn:aws:sns:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{topic_name}" + ) + assert sns.topics[0].region == AWS_REGION + + @mock_sns + # Test SNS session + def test__get_topic_attributes__(self): + sns_client = client("sns", region_name=AWS_REGION) + sns_client.create_topic(Name=topic_name) + + audit_info = self.set_mocked_audit_info() + sns = SNS(audit_info) + + assert len(sns.topics) == 1 + assert ( + sns.topics[0].arn + == f"arn:aws:sns:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{topic_name}" + ) + assert sns.topics[0].region == AWS_REGION + assert sns.topics[0].policy + assert sns.topics[0].kms_master_key_id == kms_key_id diff --git a/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/__init__.py b/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/sns_topics_kms_encryption_at_rest_enabled.metadata.json b/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/sns_topics_kms_encryption_at_rest_enabled.metadata.json new file mode 100644 index 00000000..2ff97411 --- /dev/null +++ b/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/sns_topics_kms_encryption_at_rest_enabled.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "sns_topics_kms_encryption_at_rest_enabled", + "CheckTitle": "Ensure there are no SNS Topics unencrypted", + "CheckType": [], + "ServiceName": "sns", + "SubServiceName": "", + "ResourceIdTemplate": "arn:aws:sns:region:account-id:topic", + "Severity": "high", + "ResourceType": "AwsSNSTopic", + "Description": "Ensure there are no SNS Topics unencrypted", + "Risk": "If not enabled sensitive information at rest is not protected.", + "RelatedUrl": "https://docs.aws.amazon.com/sns/latest/dg/sns-server-side-encryption.html", + "Remediation": { + "Code": { + "CLI": "aws sns set-topic-attributes --topic-arn --attribute-name 'KmsMasterKeyId' --attribute-value ", + "NativeIaC": "https://docs.bridgecrew.io/docs/general_15#cloudformation", + "Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/SNS/topic-encrypted-with-kms-customer-master-keys.html", + "Terraform": "https://docs.bridgecrew.io/docs/general_15#terraform" + }, + "Recommendation": { + "Text": "Use Amazon SNS with AWS KMS.", + "Url": "https://docs.aws.amazon.com/sns/latest/dg/sns-server-side-encryption.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] + } diff --git a/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/sns_topics_kms_encryption_at_rest_enabled.py b/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/sns_topics_kms_encryption_at_rest_enabled.py new file mode 100644 index 00000000..66e2f26d --- /dev/null +++ b/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/sns_topics_kms_encryption_at_rest_enabled.py @@ -0,0 +1,21 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.sns.sns_client import sns_client + + +class sns_topics_kms_encryption_at_rest_enabled(Check): + def execute(self): + findings = [] + for topic in sns_client.topics: + report = Check_Report(self.metadata) + report.region = topic.region + report.resource_id = topic.name + report.resource_arn = topic.arn + report.status = "PASS" + report.status_extended = f"SNS topic {topic.arn} is encrypted" + if not topic.kms_master_key_id: + report.status = "FAIL" + report.status_extended = f"SNS topic {topic.arn} is not encrypted" + + findings.append(report) + + return findings diff --git a/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/sns_topics_kms_encryption_at_rest_enabled_test.py b/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/sns_topics_kms_encryption_at_rest_enabled_test.py new file mode 100644 index 00000000..623224e3 --- /dev/null +++ b/providers/aws/services/sns/sns_topics_kms_encryption_at_rest_enabled/sns_topics_kms_encryption_at_rest_enabled_test.py @@ -0,0 +1,78 @@ +from re import search +from unittest import mock +from uuid import uuid4 + +from providers.aws.services.sns.sns_service import Topic + +AWS_REGION = "eu-west-1" +AWS_ACCOUNT_NUMBER = "123456789012" + +kms_key_id = str(uuid4()) +topic_name = "test-topic" +topic_arn = f"arn:aws:sns:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{topic_name}" + + +class Test_sns_topics_kms_encryption_at_rest_enabled: + def test_no_topics(self): + sns_client = mock.MagicMock + sns_client.topics = [] + with mock.patch( + "providers.aws.services.sns.sns_service.SNS", + sns_client, + ): + from providers.aws.services.sns.sns_topics_kms_encryption_at_rest_enabled.sns_topics_kms_encryption_at_rest_enabled import ( + sns_topics_kms_encryption_at_rest_enabled, + ) + + check = sns_topics_kms_encryption_at_rest_enabled() + result = check.execute() + assert len(result) == 0 + + def test_topics_with_key(self): + sns_client = mock.MagicMock + sns_client.topics = [] + sns_client.topics.append( + Topic( + arn=topic_arn, + name=topic_name, + kms_master_key_id=kms_key_id, + region=AWS_REGION, + ) + ) + with mock.patch( + "providers.aws.services.sns.sns_service.SNS", + sns_client, + ): + from providers.aws.services.sns.sns_topics_kms_encryption_at_rest_enabled.sns_topics_kms_encryption_at_rest_enabled import ( + sns_topics_kms_encryption_at_rest_enabled, + ) + + check = sns_topics_kms_encryption_at_rest_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert search("is encrypted", result[0].status_extended) + assert result[0].resource_id == topic_name + assert result[0].resource_arn == topic_arn + + def test_topics_no_key(self): + sns_client = mock.MagicMock + sns_client.topics = [] + sns_client.topics.append( + Topic(arn=topic_arn, name=topic_name, region=AWS_REGION) + ) + with mock.patch( + "providers.aws.services.sns.sns_service.SNS", + sns_client, + ): + from providers.aws.services.sns.sns_topics_kms_encryption_at_rest_enabled.sns_topics_kms_encryption_at_rest_enabled import ( + sns_topics_kms_encryption_at_rest_enabled, + ) + + check = sns_topics_kms_encryption_at_rest_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search("is not encrypted", result[0].status_extended) + assert result[0].resource_id == topic_name + assert result[0].resource_arn == topic_arn diff --git a/providers/aws/services/sns/sns_topics_not_publicly_accessible/__init__.py b/providers/aws/services/sns/sns_topics_not_publicly_accessible/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/sns/sns_topics_not_publicly_accessible/sns_topics_not_publicly_accessible.metadata.json b/providers/aws/services/sns/sns_topics_not_publicly_accessible/sns_topics_not_publicly_accessible.metadata.json new file mode 100644 index 00000000..b333ed5f --- /dev/null +++ b/providers/aws/services/sns/sns_topics_not_publicly_accessible/sns_topics_not_publicly_accessible.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "sns_topics_not_publicly_accessible", + "CheckTitle": "Check if SNS topics have policy set as Public", + "CheckType": [], + "ServiceName": "sns", + "SubServiceName": "", + "ResourceIdTemplate": "arn:aws:sns:region:account-id:topic", + "Severity": "high", + "ResourceType": "AwsSNSTopic", + "Description": "Check if SNS topics have policy set as Public", + "Risk": "Publicly accessible services could expose sensitive data to bad actors.", + "RelatedUrl": "https://docs.aws.amazon.com/config/latest/developerguide/sns-topic-policy.html", + "Remediation": { + "Code": { + "CLI": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/SNS/topics-everyone-publish.html", + "NativeIaC": "", + "Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/SNS/topics-everyone-publish.html", + "Terraform": "https://docs.bridgecrew.io/docs/ensure-sns-topic-policy-is-not-public-by-only-allowing-specific-services-or-principals-to-access-it#terraform" + }, + "Recommendation": { + "Text": "Ensure there is a business requirement for service to be public.", + "Url": "https://docs.aws.amazon.com/config/latest/developerguide/sns-topic-policy.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] + } diff --git a/providers/aws/services/sns/sns_topics_not_publicly_accessible/sns_topics_not_publicly_accessible.py b/providers/aws/services/sns/sns_topics_not_publicly_accessible/sns_topics_not_publicly_accessible.py new file mode 100644 index 00000000..9bf7b4d6 --- /dev/null +++ b/providers/aws/services/sns/sns_topics_not_publicly_accessible/sns_topics_not_publicly_accessible.py @@ -0,0 +1,41 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.sns.sns_client import sns_client + + +class sns_topics_not_publicly_accessible(Check): + def execute(self): + findings = [] + for topic in sns_client.topics: + report = Check_Report(self.metadata) + report.region = topic.region + report.resource_id = topic.name + report.resource_arn = topic.arn + report.status = "PASS" + report.status_extended = f"SNS topic {topic.name} without public access" + if topic.policy: + for statement in topic.policy["Statement"]: + # Only check allow statements + if statement["Effect"] == "Allow": + if ( + "*" in statement["Principal"] + or ( + "AWS" in statement["Principal"] + and "*" in statement["Principal"]["AWS"] + ) + or ( + "CanonicalUser" in statement["Principal"] + and "*" in statement["Principal"]["CanonicalUser"] + ) + ): + if "Condition" not in statement: + report.status = "FAIL" + report.status_extended = ( + f"SNS topic {topic.name} policy with public access" + ) + else: + report.status = "FAIL" + report.status_extended = f"SNS topic {topic.name} policy with public access but has a Condition" + + findings.append(report) + + return findings diff --git a/providers/aws/services/sns/sns_topics_not_publicly_accessible/sns_topics_not_publicly_accessible_test.py b/providers/aws/services/sns/sns_topics_not_publicly_accessible/sns_topics_not_publicly_accessible_test.py new file mode 100644 index 00000000..d1c942ee --- /dev/null +++ b/providers/aws/services/sns/sns_topics_not_publicly_accessible/sns_topics_not_publicly_accessible_test.py @@ -0,0 +1,165 @@ +from re import search +from unittest import mock +from uuid import uuid4 + +from providers.aws.services.sns.sns_service import Topic + +AWS_REGION = "eu-west-1" +AWS_ACCOUNT_NUMBER = "123456789012" + +kms_key_id = str(uuid4()) +topic_name = "test-topic" +topic_arn = f"arn:aws:sns:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{topic_name}" +test_policy_restricted = { + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": f"{AWS_ACCOUNT_NUMBER}"}, + "Action": ["sns:Publish"], + "Resource": f"arn:aws:sns:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{topic_name}", + } + ] +} + +test_policy_restricted_condition = { + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": ["sns:Publish"], + "Resource": f"arn:aws:sns:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{topic_name}", + "Condition": {"StringEquals": {"sns:Protocol": "https"}}, + } + ] +} + +test_policy_not_restricted = { + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": ["sns:Publish"], + "Resource": f"arn:aws:sns:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:{topic_name}", + } + ] +} + + +class Test_sns_topics_not_publicly_accessible: + def test_no_topics(self): + sns_client = mock.MagicMock + sns_client.topics = [] + with mock.patch( + "providers.aws.services.sns.sns_service.SNS", + sns_client, + ): + from providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import ( + sns_topics_not_publicly_accessible, + ) + + check = sns_topics_not_publicly_accessible() + result = check.execute() + assert len(result) == 0 + + def test_topics_not_public(self): + sns_client = mock.MagicMock + sns_client.topics = [] + sns_client.topics.append( + Topic( + arn=topic_arn, + name=topic_name, + policy=test_policy_restricted, + region=AWS_REGION, + ) + ) + with mock.patch( + "providers.aws.services.sns.sns_service.SNS", + sns_client, + ): + from providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import ( + sns_topics_not_publicly_accessible, + ) + + check = sns_topics_not_publicly_accessible() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert search("without public access", result[0].status_extended) + assert result[0].resource_id == topic_name + assert result[0].resource_arn == topic_arn + + def test_topics_no_policy(self): + sns_client = mock.MagicMock + sns_client.topics = [] + sns_client.topics.append( + Topic(arn=topic_arn, name=topic_name, region=AWS_REGION) + ) + with mock.patch( + "providers.aws.services.sns.sns_service.SNS", + sns_client, + ): + from providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import ( + sns_topics_not_publicly_accessible, + ) + + check = sns_topics_not_publicly_accessible() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert search("without public access", result[0].status_extended) + assert result[0].resource_id == topic_name + assert result[0].resource_arn == topic_arn + + def test_topics_public_with_condition(self): + sns_client = mock.MagicMock + sns_client.topics = [] + sns_client.topics.append( + Topic( + arn=topic_arn, + name=topic_name, + policy=test_policy_restricted_condition, + region=AWS_REGION, + ) + ) + with mock.patch( + "providers.aws.services.sns.sns_service.SNS", + sns_client, + ): + from providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import ( + sns_topics_not_publicly_accessible, + ) + + check = sns_topics_not_publicly_accessible() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search("but has a Condition", result[0].status_extended) + assert result[0].resource_id == topic_name + assert result[0].resource_arn == topic_arn + + def test_topics_no_key(self): + sns_client = mock.MagicMock + sns_client.topics = [] + sns_client.topics.append( + Topic( + arn=topic_arn, + name=topic_name, + region=AWS_REGION, + policy=test_policy_not_restricted, + ) + ) + with mock.patch( + "providers.aws.services.sns.sns_service.SNS", + sns_client, + ): + from providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import ( + sns_topics_not_publicly_accessible, + ) + + check = sns_topics_not_publicly_accessible() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search("with public access", result[0].status_extended) + assert result[0].resource_id == topic_name + assert result[0].resource_arn == topic_arn