diff --git a/Pipfile b/Pipfile index 6d5e68cb..01f1d2c9 100644 --- a/Pipfile +++ b/Pipfile @@ -26,6 +26,7 @@ pytest = "7.1.2" pytest-xdist = "2.5.0" coverage = "6.4.1" sure = "2.0.0" +freezegun = "1.2.1" [requires] python_version = "3.9" diff --git a/Pipfile.lock b/Pipfile.lock index 630636b6..74e5e14d 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "4e3096e19f235b38d957c39849b152deb4a0ffaa4af22dfcb49ec6446b190918" + "sha256": "881edb3306efd59b84c75bd2bff5acbc29397eb7f12321203c62a013f0491e2e" }, "pipfile-spec": 6, "requires": { @@ -26,26 +26,26 @@ }, "boto3": { "hashes": [ - "sha256:b39303fdda9b5d77a152e3ec9f264ae318ccdaa853eaf694626dc335464ded98", - "sha256:c02fc93a926944b4b426a170d2dae274b4c8c09ec5259450b94269a8ce990dd7" + "sha256:15b059251990706c5d5a556c42a6e6e781a51edadf48a42afdd06bd313adfaf2", + "sha256:27efa5eb229364bc4643d9e182de0891f73c21b65ef80b5bd02a977caeaf595c" ], "index": "pypi", - "version": "==1.26.8" + "version": "==1.26.9" }, "botocore": { "hashes": [ - "sha256:48cf33d7c513320711321c3b303b0c9810b23e15fa03424f7323883e4ce6cef8", - "sha256:9c6adcf4e080be63b92f50d01e176ef2d1d2a3da7d8387a964abb9eb65fc8aad" + "sha256:01f168e2418419a6d8b335ecc4330faa6a7332d5a097029fc9b4a3ae3c41cea2", + "sha256:c876a7c6a07d7f86f46a8cc765f0d38999b84ac1d7b0666d6ce08c0503e13145" ], "index": "pypi", - "version": "==1.29.8" + "version": "==1.29.9" }, "certifi": { "hashes": [ "sha256:0d9c601124e5a6ba9712dbc60d9c53c21e34f5f641fe83002317394311bdce14", "sha256:90c1a32f1d68f940488354e36370f6cca89f0f106db09518524c88d6ed83f382" ], - "markers": "python_version >= '3.6'", + "markers": "python_full_version >= '3.6.0'", "version": "==2022.9.24" }, "charset-normalizer": { @@ -196,7 +196,7 @@ "sha256:e61ceaab6f49fb8bdfaa0f92c4b57bcfbea54c09277b1b4f7ac376bfb7a7c174", "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5" ], - "markers": "python_version >= '3.6'", + "markers": "python_full_version >= '3.6.0'", "version": "==6.0" }, "requests": { @@ -309,26 +309,26 @@ }, "boto3": { "hashes": [ - "sha256:b39303fdda9b5d77a152e3ec9f264ae318ccdaa853eaf694626dc335464ded98", - "sha256:c02fc93a926944b4b426a170d2dae274b4c8c09ec5259450b94269a8ce990dd7" + "sha256:15b059251990706c5d5a556c42a6e6e781a51edadf48a42afdd06bd313adfaf2", + "sha256:27efa5eb229364bc4643d9e182de0891f73c21b65ef80b5bd02a977caeaf595c" ], "index": "pypi", - "version": "==1.26.8" + "version": "==1.26.9" }, "botocore": { "hashes": [ - "sha256:48cf33d7c513320711321c3b303b0c9810b23e15fa03424f7323883e4ce6cef8", - "sha256:9c6adcf4e080be63b92f50d01e176ef2d1d2a3da7d8387a964abb9eb65fc8aad" + "sha256:01f168e2418419a6d8b335ecc4330faa6a7332d5a097029fc9b4a3ae3c41cea2", + "sha256:c876a7c6a07d7f86f46a8cc765f0d38999b84ac1d7b0666d6ce08c0503e13145" ], "index": "pypi", - "version": "==1.29.8" + "version": "==1.29.9" }, "certifi": { "hashes": [ "sha256:0d9c601124e5a6ba9712dbc60d9c53c21e34f5f641fe83002317394311bdce14", "sha256:90c1a32f1d68f940488354e36370f6cca89f0f106db09518524c88d6ed83f382" ], - "markers": "python_version >= '3.6'", + "markers": "python_full_version >= '3.6.0'", "version": "==2022.9.24" }, "cffi": { @@ -530,11 +530,11 @@ }, "exceptiongroup": { "hashes": [ - "sha256:a31cd183c3dea02e617aab5153588d5f7258a77b51f0ef41b3815ae8a0d0f695", - "sha256:c22f11ec6a10d2b453871c5c5fe887436c4d1961324ce9090f2ca6ddc4180c27" + "sha256:6002703c7d31fb9950ddc8780840f67880c440895dc1151dd551553aa1246e4a", + "sha256:76cac74b5207c5997678a1c7105cb6f14213c9c63c096a38cfcb529d83ce5c02" ], "markers": "python_version < '3.11'", - "version": "==1.0.2" + "version": "==1.0.3" }, "execnet": { "hashes": [ @@ -552,6 +552,14 @@ "index": "pypi", "version": "==5.0.4" }, + "freezegun": { + "hashes": [ + "sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446", + "sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f" + ], + "index": "pypi", + "version": "==1.2.2" + }, "gitdb": { "hashes": [ "sha256:8033ad4e853066ba6ca92050b9df2f89301b8fc8bf7e9324d412a63f8bf1a8fd", @@ -596,7 +604,7 @@ "sha256:6f62d78e2f89b4500b080fe3a81690850cd254227f27f75c3a0c491a1f351ba7", "sha256:e8443a5e7a020e9d7f97f1d7d9cd17c88bcb3bc7e218bf9cf5095fe550be2951" ], - "markers": "python_version < '4.0' and python_full_version >= '3.6.1'", + "markers": "python_full_version >= '3.6.1' and python_full_version < '4.0.0'", "version": "==5.10.1" }, "jinja2": { @@ -628,7 +636,7 @@ "sha256:1e525177574c23ae0f55cd62382632a083a0339928f0ca846a975a4da9851cec", "sha256:780a22d517cdc857d9714a80d8349c546945063f20853ea32ba7f85bc643ec7d" ], - "markers": "python_full_version >= '3.7.0' and python_full_version < '4.0.0'", + "markers": "python_version >= '3.7' and python_full_version < '4.0.0'", "version": "==0.1.2" }, "lazy-object-proxy": { @@ -738,7 +746,7 @@ "sha256:34fbd14b7501abe25e64d7b4624a9db02cde1a578d285b3da6f34b290cdf0b3a", "sha256:7cf27585dd7970b7257cefe48e1a3a10d4e34421831bdb472d96967433bc27bd" ], - "markers": "python_full_version >= '3.7.0' and python_full_version < '4.0.0'", + "markers": "python_version >= '3.7' and python_full_version < '4.0.0'", "version": "==0.3.4" }, "openapi-spec-validator": { @@ -762,7 +770,7 @@ "sha256:5c869d315be50776cc8a993f3af43e0c60dc01506b399643f919034ebf4cdcab", "sha256:cdd7b1f9d7d5c8b8d3315dbf5a86b2596053ae845f056f57d97c0eefff84da14" ], - "markers": "python_full_version >= '3.7.0' and python_full_version < '4.0.0'", + "markers": "python_version >= '3.7' and python_full_version < '4.0.0'", "version": "==0.4.3" }, "pathspec": { @@ -938,7 +946,7 @@ "sha256:e61ceaab6f49fb8bdfaa0f92c4b57bcfbea54c09277b1b4f7ac376bfb7a7c174", "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5" ], - "markers": "python_version >= '3.6'", + "markers": "python_full_version >= '3.6.0'", "version": "==6.0" }, "requests": { diff --git a/providers/aws/services/codeartifact/codeartifact_packages_external_public_publishing_disabled/__init__.py b/providers/aws/services/codeartifact/codeartifact_packages_external_public_publishing_disabled/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/codeartifact/codeartifact_packages_external_public_publishing_disabled/codeartifact_packages_external_public_publishing_disabled_test.py b/providers/aws/services/codeartifact/codeartifact_packages_external_public_publishing_disabled/codeartifact_packages_external_public_publishing_disabled_test.py index 92af80a9..129b7f42 100644 --- a/providers/aws/services/codeartifact/codeartifact_packages_external_public_publishing_disabled/codeartifact_packages_external_public_publishing_disabled_test.py +++ b/providers/aws/services/codeartifact/codeartifact_packages_external_public_publishing_disabled/codeartifact_packages_external_public_publishing_disabled_test.py @@ -17,7 +17,7 @@ from providers.aws.services.codeartifact.codeartifact_service import ( AWS_REGION = "eu-west-1" -class Test_accessanalyzer_enabled_without_findings: +class Test_codeartifact_packages_external_public_publishing_disabled: def test_no_repositories(self): codeartifact_client = mock.MagicMock codeartifact_client.repositories = {} diff --git a/providers/aws/services/directoryservice/__init__.py b/providers/aws/services/directoryservice/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/directoryservice/directoryservice_client.py b/providers/aws/services/directoryservice/directoryservice_client.py new file mode 100644 index 00000000..cd14c5bb --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_client.py @@ -0,0 +1,6 @@ +from providers.aws.lib.audit_info.audit_info import current_audit_info +from providers.aws.services.directoryservice.directoryservice_service import ( + DirectoryService, +) + +directoryservice_client = DirectoryService(current_audit_info) diff --git a/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/__init__.py b/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/directoryservice_directory_log_forwarding_enabled.metadata.json b/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/directoryservice_directory_log_forwarding_enabled.metadata.json new file mode 100644 index 00000000..448d34c0 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/directoryservice_directory_log_forwarding_enabled.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "directoryservice_directory_log_forwarding_enabled", + "CheckTitle": "Directory Service monitoring with CloudWatch logs.", + "CheckType": [], + "ServiceName": "directoryservice", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id", + "Severity": "medium", + "ResourceType": "AwsDirectoryService", + "Description": "Directory Service monitoring with CloudWatch logs.", + "Risk": "As a best practice, monitor your organization to ensure that changes are logged. This helps you to ensure that any unexpected change can be investigated and unwanted changes can be rolled back.", + "RelatedUrl": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/incident-response.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "It is recommended that that the export of logs is enabled.", + "Url": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/incident-response.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/directoryservice_directory_log_forwarding_enabled.py b/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/directoryservice_directory_log_forwarding_enabled.py new file mode 100644 index 00000000..bb355325 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/directoryservice_directory_log_forwarding_enabled.py @@ -0,0 +1,23 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.directoryservice.directoryservice_client import ( + directoryservice_client, +) + + +class directoryservice_directory_log_forwarding_enabled(Check): + def execute(self): + findings = [] + for directory in directoryservice_client.directories.values(): + report = Check_Report(self.metadata) + report.region = directory.region + report.resource_id = directory.name + if directory.log_subscriptions: + report.status = "PASS" + report.status_extended = f"Directory Service {directory.name} have log forwarding to CloudWatch enabled" + else: + report.status = "FAIL" + report.status_extended = f"Directory Service {directory.name} have log forwarding to CloudWatch disabled" + + findings.append(report) + + return findings diff --git a/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/directoryservice_directory_log_forwarding_enabled_test.py b/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/directoryservice_directory_log_forwarding_enabled_test.py new file mode 100644 index 00000000..61bce056 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_directory_log_forwarding_enabled/directoryservice_directory_log_forwarding_enabled_test.py @@ -0,0 +1,95 @@ +from datetime import datetime +from unittest import mock + +from providers.aws.services.directoryservice.directoryservice_service import ( + Directory, + LogSubscriptions, +) + +AWS_REGION = "eu-west-1" + + +class Test_directoryservice_directory_log_forwarding_enabled: + def test_no_directories(self): + directoryservice_client = mock.MagicMock + directoryservice_client.directories = {} + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_log_forwarding_enabled.directoryservice_directory_log_forwarding_enabled import ( + directoryservice_directory_log_forwarding_enabled, + ) + + check = directoryservice_directory_log_forwarding_enabled() + result = check.execute() + + assert len(result) == 0 + + def test_one_directory_logging_disabled(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + log_subscriptions=[], + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_log_forwarding_enabled.directoryservice_directory_log_forwarding_enabled import ( + directoryservice_directory_log_forwarding_enabled, + ) + + check = directoryservice_directory_log_forwarding_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == "test-directory" + assert result[0].region == AWS_REGION + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Directory Service {directory_name} have log forwarding to CloudWatch disabled" + ) + + def test_one_directory_logging_enabled(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + log_subscriptions=[ + LogSubscriptions( + log_group_name="test-log-group", + created_date_time=datetime(2022, 1, 1), + ) + ], + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_log_forwarding_enabled.directoryservice_directory_log_forwarding_enabled import ( + directoryservice_directory_log_forwarding_enabled, + ) + + check = directoryservice_directory_log_forwarding_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == "test-directory" + assert result[0].region == AWS_REGION + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Directory Service {directory_name} have log forwarding to CloudWatch enabled" + ) diff --git a/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/__init__.py b/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/directoryservice_directory_monitor_notifications.metadata.json b/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/directoryservice_directory_monitor_notifications.metadata.json new file mode 100644 index 00000000..77f81aa4 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/directoryservice_directory_monitor_notifications.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "directoryservice_directory_monitor_notifications", + "CheckTitle": "Directory Service has SNS Notifications enabled.", + "CheckType": [], + "ServiceName": "directoryservice", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id", + "Severity": "medium", + "ResourceType": "AwsDirectoryService", + "Description": "Directory Service has SNS Notifications enabled.", + "Risk": "As a best practice, monitor status of Directory Service. This helps to avoid late actions to fix Directory Service issues.", + "RelatedUrl": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_enable_notifications.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "It is recommended set up SNS messaging to send email or text messages when the status of your directory changes.", + "Url": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_enable_notifications.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/directoryservice_directory_monitor_notifications.py b/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/directoryservice_directory_monitor_notifications.py new file mode 100644 index 00000000..0c4f5660 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/directoryservice_directory_monitor_notifications.py @@ -0,0 +1,27 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.directoryservice.directoryservice_client import ( + directoryservice_client, +) + + +class directoryservice_directory_monitor_notifications(Check): + def execute(self): + findings = [] + for directory in directoryservice_client.directories.values(): + report = Check_Report(self.metadata) + report.region = directory.region + report.resource_id = directory.name + if directory.event_topics: + report.status = "PASS" + report.status_extended = ( + f"Directory Service {directory.name} have SNS messaging enabled" + ) + else: + report.status = "FAIL" + report.status_extended = ( + f"Directory Service {directory.name} have SNS messaging disabled" + ) + + findings.append(report) + + return findings diff --git a/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/directoryservice_directory_monitor_notifications_test.py b/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/directoryservice_directory_monitor_notifications_test.py new file mode 100644 index 00000000..7450f9f6 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_directory_monitor_notifications/directoryservice_directory_monitor_notifications_test.py @@ -0,0 +1,100 @@ +from datetime import datetime +from unittest import mock + +from moto.core import DEFAULT_ACCOUNT_ID + +from providers.aws.services.directoryservice.directoryservice_service import ( + Directory, + EventTopics, + EventTopicStatus, +) + +AWS_REGION = "eu-west-1" + + +class Test_directoryservice_directory_monitor_notifications: + def test_no_directories(self): + directoryservice_client = mock.MagicMock + directoryservice_client.directories = {} + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_monitor_notifications.directoryservice_directory_monitor_notifications import ( + directoryservice_directory_monitor_notifications, + ) + + check = directoryservice_directory_monitor_notifications() + result = check.execute() + + assert len(result) == 0 + + def test_one_directory_logging_disabled(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + event_topics=[], + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_monitor_notifications.directoryservice_directory_monitor_notifications import ( + directoryservice_directory_monitor_notifications, + ) + + check = directoryservice_directory_monitor_notifications() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == "test-directory" + assert result[0].region == AWS_REGION + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Directory Service {directory_name} have SNS messaging disabled" + ) + + def test_one_directory_logging_enabled(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + event_topics=[ + EventTopics( + topic_arn=f"arn:aws:sns:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:test-topic", + topic_name="test-topic", + status=EventTopicStatus.Registered, + created_date_time=datetime(2022, 1, 1), + ) + ], + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_monitor_notifications.directoryservice_directory_monitor_notifications import ( + directoryservice_directory_monitor_notifications, + ) + + check = directoryservice_directory_monitor_notifications() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == "test-directory" + assert result[0].region == AWS_REGION + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Directory Service {directory_name} have SNS messaging enabled" + ) diff --git a/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/__init__.py b/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/directoryservice_directory_snapshots_limit.metadata.json b/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/directoryservice_directory_snapshots_limit.metadata.json new file mode 100644 index 00000000..85efa916 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/directoryservice_directory_snapshots_limit.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "directoryservice_directory_snapshots_limit", + "CheckTitle": "Directory Service Manual Snapshots limit reached.", + "CheckType": [], + "ServiceName": "directoryservice", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id", + "Severity": "low", + "ResourceType": "AwsDirectoryService", + "Description": "Directory Service Manual Snapshots limit reached.", + "Risk": "A limit reached can bring unwanted results. The maximum number of manual snapshots is a hard limit.", + "RelatedUrl": "https://docs.aws.amazon.com/general/latest/gr/ds_region.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Monitor manual snapshots limit to ensure capacity when you need it.", + "Url": "https://docs.aws.amazon.com/general/latest/gr/ds_region.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/directoryservice_directory_snapshots_limit.py b/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/directoryservice_directory_snapshots_limit.py new file mode 100644 index 00000000..2d2a878b --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/directoryservice_directory_snapshots_limit.py @@ -0,0 +1,33 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.directoryservice.directoryservice_client import ( + directoryservice_client, +) + +SNAPSHOT_LIMIT_THRESHOLD = 2 +"""Number of remaining snapshots to reach the limit""" + + +class directoryservice_directory_snapshots_limit(Check): + def execute(self): + findings = [] + for directory in directoryservice_client.directories.values(): + report = Check_Report(self.metadata) + report.region = directory.region + report.resource_id = directory.name + if directory.snapshots_limits.manual_snapshots_limit_reached: + report.status = "FAIL" + report.status_extended = f"Directory Service {directory.name} reached {directory.snapshots_limits.manual_snapshots_limit} Snapshots limit" + else: + limit_remaining = ( + directory.snapshots_limits.manual_snapshots_limit + - directory.snapshots_limits.manual_snapshots_current_count + ) + if limit_remaining <= SNAPSHOT_LIMIT_THRESHOLD: + report.status = "FAIL" + report.status_extended = f"Directory Service {directory.name} is about to reach {directory.snapshots_limits.manual_snapshots_limit} Snapshots which is the limit" + else: + report.status = "PASS" + report.status_extended = f"Directory Service {directory.name} is using {directory.snapshots_limits.manual_snapshots_current_count} out of {directory.snapshots_limits.manual_snapshots_limit} from the Snapshots Limit" + findings.append(report) + + return findings diff --git a/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/directoryservice_directory_snapshots_limit_test.py b/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/directoryservice_directory_snapshots_limit_test.py new file mode 100644 index 00000000..058f82bc --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_directory_snapshots_limit/directoryservice_directory_snapshots_limit_test.py @@ -0,0 +1,179 @@ +from unittest import mock + +from providers.aws.services.directoryservice.directoryservice_service import ( + Directory, + SnapshotLimit, +) + +AWS_REGION = "eu-west-1" + + +class Test_directoryservice_directory_snapshots_limit: + def test_no_directories(self): + directoryservice_client = mock.MagicMock + directoryservice_client.directories = {} + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_snapshots_limit.directoryservice_directory_snapshots_limit import ( + directoryservice_directory_snapshots_limit, + ) + + check = directoryservice_directory_snapshots_limit() + result = check.execute() + + assert len(result) == 0 + + def test_one_directory_snapshots_limit_reached(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + manual_snapshots_current_count = 5 + manual_snapshots_limit = 5 + manual_snapshots_limit_reached = True + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + snapshots_limits=SnapshotLimit( + manual_snapshots_current_count=manual_snapshots_current_count, + manual_snapshots_limit=manual_snapshots_limit, + manual_snapshots_limit_reached=manual_snapshots_limit_reached, + ), + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_snapshots_limit.directoryservice_directory_snapshots_limit import ( + directoryservice_directory_snapshots_limit, + ) + + check = directoryservice_directory_snapshots_limit() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == "test-directory" + assert result[0].region == AWS_REGION + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Directory Service {directory_name} reached {manual_snapshots_limit} Snapshots limit" + ) + + def test_one_directory_snapshots_limit_over_threshold(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + manual_snapshots_current_count = 4 + manual_snapshots_limit = 5 + manual_snapshots_limit_reached = False + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + snapshots_limits=SnapshotLimit( + manual_snapshots_current_count=manual_snapshots_current_count, + manual_snapshots_limit=manual_snapshots_limit, + manual_snapshots_limit_reached=manual_snapshots_limit_reached, + ), + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_snapshots_limit.directoryservice_directory_snapshots_limit import ( + directoryservice_directory_snapshots_limit, + ) + + check = directoryservice_directory_snapshots_limit() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == "test-directory" + assert result[0].region == AWS_REGION + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Directory Service {directory_name} is about to reach {manual_snapshots_limit} Snapshots which is the limit" + ) + + def test_one_directory_snapshots_limit_equal_threshold(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + manual_snapshots_current_count = 3 + manual_snapshots_limit = 5 + manual_snapshots_limit_reached = False + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + snapshots_limits=SnapshotLimit( + manual_snapshots_current_count=manual_snapshots_current_count, + manual_snapshots_limit=manual_snapshots_limit, + manual_snapshots_limit_reached=manual_snapshots_limit_reached, + ), + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_snapshots_limit.directoryservice_directory_snapshots_limit import ( + directoryservice_directory_snapshots_limit, + ) + + check = directoryservice_directory_snapshots_limit() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == "test-directory" + assert result[0].region == AWS_REGION + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Directory Service {directory_name} is about to reach {manual_snapshots_limit} Snapshots which is the limit" + ) + + def test_one_directory_snapshots_limit_more_threshold(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + manual_snapshots_current_count = 1 + manual_snapshots_limit = 5 + manual_snapshots_limit_reached = False + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + snapshots_limits=SnapshotLimit( + manual_snapshots_current_count=manual_snapshots_current_count, + manual_snapshots_limit=manual_snapshots_limit, + manual_snapshots_limit_reached=manual_snapshots_limit_reached, + ), + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_directory_snapshots_limit.directoryservice_directory_snapshots_limit import ( + directoryservice_directory_snapshots_limit, + ) + + check = directoryservice_directory_snapshots_limit() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == "test-directory" + assert result[0].region == AWS_REGION + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Directory Service {directory_name} is using {manual_snapshots_current_count} out of {manual_snapshots_limit} from the Snapshots Limit" + ) diff --git a/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/__init__.py b/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/directoryservice_ldap_certificate_expiration.metadata.json b/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/directoryservice_ldap_certificate_expiration.metadata.json new file mode 100644 index 00000000..3cddfd00 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/directoryservice_ldap_certificate_expiration.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "directoryservice_ldap_certificate_expiration", + "CheckTitle": "Directory Service LDAP Certificates expiration.", + "CheckType": [], + "ServiceName": "directoryservice", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id", + "Severity": "medium", + "ResourceType": "AwsDirectoryService", + "Description": "Directory Service Manual Snapshots limit reached.", + "Risk": "Expired certificates can impact service availability.", + "RelatedUrl": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_ldap.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Monitor certificate expiration and take automated action to alarm responsible team for taking care of the replacement or remove.", + "Url": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_ldap.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/directoryservice_ldap_certificate_expiration.py b/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/directoryservice_ldap_certificate_expiration.py new file mode 100644 index 00000000..390a67f2 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/directoryservice_ldap_certificate_expiration.py @@ -0,0 +1,33 @@ +from datetime import datetime + +from lib.check.models import Check, Check_Report +from providers.aws.services.directoryservice.directoryservice_client import ( + directoryservice_client, +) + +DAYS_TO_EXPIRE_THRESHOLD = 90 +"""Number of days to notify about a certificate expiration""" + + +class directoryservice_ldap_certificate_expiration(Check): + def execute(self): + findings = [] + for directory in directoryservice_client.directories.values(): + for certificate in directory.certificates: + report = Check_Report(self.metadata) + report.region = directory.region + report.resource_id = certificate.id + + remaining_days_to_expire = ( + certificate.expiry_date_time - datetime.today() + ).days + if remaining_days_to_expire <= DAYS_TO_EXPIRE_THRESHOLD: + report.status = "FAIL" + report.status_extended = f"LDAP Certificate {certificate.id} configured at {directory.name} is about to expire in {remaining_days_to_expire} days" + else: + report.status = "PASS" + report.status_extended = f"LDAP Certificate {certificate.id} configured at {directory.name} expires in {remaining_days_to_expire} days" + + findings.append(report) + + return findings diff --git a/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/directoryservice_ldap_certificate_expiration_test.py b/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/directoryservice_ldap_certificate_expiration_test.py new file mode 100644 index 00000000..b3cd8c31 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_ldap_certificate_expiration/directoryservice_ldap_certificate_expiration_test.py @@ -0,0 +1,187 @@ +from datetime import datetime +from unittest import mock + +from freezegun import freeze_time + +from providers.aws.services.directoryservice.directoryservice_service import ( + Certificate, + CertificateState, + CertificateType, + Directory, +) + +AWS_REGION = "eu-west-1" + + +# Always use a mocked date to test the certificates expiration +@freeze_time("2023-01-01") +class Test_directoryservice_ldap_certificate_expiration: + def test_no_directories(self): + directoryservice_client = mock.MagicMock + directoryservice_client.directories = {} + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_ldap_certificate_expiration.directoryservice_ldap_certificate_expiration import ( + directoryservice_ldap_certificate_expiration, + ) + + check = directoryservice_ldap_certificate_expiration() + result = check.execute() + + assert len(result) == 0 + + def test_directory_no_certificate(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + certificates=[], + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_ldap_certificate_expiration.directoryservice_ldap_certificate_expiration import ( + directoryservice_ldap_certificate_expiration, + ) + + check = directoryservice_ldap_certificate_expiration() + result = check.execute() + + assert len(result) == 0 + + def test_directory_certificate_expires_in_365_days(self): + remaining_days_to_expire = 365 + + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + certificate_id = "test-certificate" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + certificates=[ + Certificate( + id=certificate_id, + common_name=certificate_id, + state=CertificateState.Registered, + type=CertificateType.ClientLDAPS, + expiry_date_time=datetime(2024, 1, 1), + ) + ], + ) + } + + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_ldap_certificate_expiration.directoryservice_ldap_certificate_expiration import ( + directoryservice_ldap_certificate_expiration, + ) + + check = directoryservice_ldap_certificate_expiration() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == certificate_id + assert result[0].region == AWS_REGION + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"LDAP Certificate {certificate_id} configured at {directory_name} expires in {remaining_days_to_expire} days" + ) + + def test_directory_certificate_expires_in_90_days(self): + remaining_days_to_expire = 90 + + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + certificate_id = "test-certificate" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + certificates=[ + Certificate( + id=certificate_id, + common_name=certificate_id, + state=CertificateState.Registered, + type=CertificateType.ClientLDAPS, + expiry_date_time=datetime(2023, 4, 1), + ) + ], + ) + } + + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_ldap_certificate_expiration.directoryservice_ldap_certificate_expiration import ( + directoryservice_ldap_certificate_expiration, + ) + + check = directoryservice_ldap_certificate_expiration() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == certificate_id + assert result[0].region == AWS_REGION + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"LDAP Certificate {certificate_id} configured at {directory_name} is about to expire in {remaining_days_to_expire} days" + ) + + def test_directory_certificate_expires_in_31_days(self): + remaining_days_to_expire = 31 + + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + certificate_id = "test-certificate" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + certificates=[ + Certificate( + id=certificate_id, + common_name=certificate_id, + state=CertificateState.Registered, + type=CertificateType.ClientLDAPS, + expiry_date_time=datetime(2023, 2, 1), + ) + ], + ) + } + + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_ldap_certificate_expiration.directoryservice_ldap_certificate_expiration import ( + directoryservice_ldap_certificate_expiration, + ) + + check = directoryservice_ldap_certificate_expiration() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == certificate_id + assert result[0].region == AWS_REGION + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"LDAP Certificate {certificate_id} configured at {directory_name} is about to expire in {remaining_days_to_expire} days" + ) diff --git a/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/__init__.py b/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/directoryservice_radius_server_security_protocol.metadata.json b/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/directoryservice_radius_server_security_protocol.metadata.json new file mode 100644 index 00000000..700e3825 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/directoryservice_radius_server_security_protocol.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "directoryservice_radius_server_security_protocol", + "CheckTitle": "Ensure Radius server in DS is using the recommended security protocol.", + "CheckType": [], + "ServiceName": "directoryservice", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id", + "Severity": "medium", + "ResourceType": "AwsDirectoryService", + "Description": "Ensure Radius server in DS is using the recommended security protocol.", + "Risk": "As a best practice, you might need to configure the authentication protocol between the Microsoft AD DCs and the RADIUS/MFA server. Supported protocols are PAP, CHAP MS-CHAPv1, and MS-CHAPv2. MS-CHAPv2 is recommended because it provides the strongest security of the three options.", + "RelatedUrl": "https://aws.amazon.com/blogs/security/how-to-enable-multi-factor-authentication-for-amazon-workspaces-and-amazon-quicksight-by-using-microsoft-ad-and-on-premises-credentials/", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "MS-CHAPv2 provides the strongest security of the options supported, and is therefore recommended.", + "Url": "https://aws.amazon.com/blogs/security/how-to-enable-multi-factor-authentication-for-amazon-workspaces-and-amazon-quicksight-by-using-microsoft-ad-and-on-premises-credentials/" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/directoryservice_radius_server_security_protocol.py b/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/directoryservice_radius_server_security_protocol.py new file mode 100644 index 00000000..9501bd78 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/directoryservice_radius_server_security_protocol.py @@ -0,0 +1,30 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.directoryservice.directoryservice_client import ( + directoryservice_client, +) +from providers.aws.services.directoryservice.directoryservice_service import ( + AuthenticationProtocol, +) + + +class directoryservice_radius_server_security_protocol(Check): + def execute(self): + findings = [] + for directory in directoryservice_client.directories.values(): + if directory.radius_settings: + report = Check_Report(self.metadata) + report.region = directory.region + report.resource_id = directory.name + if ( + directory.radius_settings.authentication_protocol + == AuthenticationProtocol.MS_CHAPv2 + ): + report.status = "PASS" + report.status_extended = f"Radius server of Directory {directory.name} have recommended security protocol for the Radius server" + else: + report.status = "FAIL" + report.status_extended = f"Radius server of Directory {directory.name} does not have recommended security protocol for the Radius server" + + findings.append(report) + + return findings diff --git a/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/directoryservice_radius_server_security_protocol_test.py b/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/directoryservice_radius_server_security_protocol_test.py new file mode 100644 index 00000000..48e1e0c1 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_radius_server_security_protocol/directoryservice_radius_server_security_protocol_test.py @@ -0,0 +1,121 @@ +from unittest import mock + +from providers.aws.services.directoryservice.directoryservice_service import ( + AuthenticationProtocol, + Directory, + RadiusSettings, + RadiusStatus, +) + +AWS_REGION = "eu-west-1" + + +class Test_directoryservice_radius_server_security_protocol: + def test_no_directories(self): + directoryservice_client = mock.MagicMock + directoryservice_client.directories = {} + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_radius_server_security_protocol.directoryservice_radius_server_security_protocol import ( + directoryservice_radius_server_security_protocol, + ) + + check = directoryservice_radius_server_security_protocol() + result = check.execute() + + assert len(result) == 0 + + def test_directory_no_radius_server(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + radius_settings=None, + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_radius_server_security_protocol.directoryservice_radius_server_security_protocol import ( + directoryservice_radius_server_security_protocol, + ) + + check = directoryservice_radius_server_security_protocol() + result = check.execute() + + assert len(result) == 0 + + def test_directory_radius_server_bad_auth_protocol(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + radius_settings=RadiusSettings( + authentication_protocol=AuthenticationProtocol.MS_CHAPv1, + status=RadiusStatus.Completed, + ), + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_radius_server_security_protocol.directoryservice_radius_server_security_protocol import ( + directoryservice_radius_server_security_protocol, + ) + + check = directoryservice_radius_server_security_protocol() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == directory_name + assert result[0].region == AWS_REGION + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Radius server of Directory {directory_name} does not have recommended security protocol for the Radius server" + ) + + def test_directory_radius_server_secure_auth_protocol(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + radius_settings=RadiusSettings( + authentication_protocol=AuthenticationProtocol.MS_CHAPv2, + status=RadiusStatus.Completed, + ), + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_radius_server_security_protocol.directoryservice_radius_server_security_protocol import ( + directoryservice_radius_server_security_protocol, + ) + + check = directoryservice_radius_server_security_protocol() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == directory_name + assert result[0].region == AWS_REGION + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Radius server of Directory {directory_name} have recommended security protocol for the Radius server" + ) diff --git a/providers/aws/services/directoryservice/directoryservice_service.py b/providers/aws/services/directoryservice/directoryservice_service.py new file mode 100644 index 00000000..26c7e0bd --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_service.py @@ -0,0 +1,254 @@ +import threading +from datetime import datetime +from enum import Enum +from typing import Union + +from pydantic import BaseModel + +from lib.logger import logger +from providers.aws.aws_provider import generate_regional_clients + + +################## DirectoryService +class DirectoryService: + def __init__(self, audit_info): + self.service = "ds" + self.session = audit_info.audit_session + self.audited_account = audit_info.audited_account + self.regional_clients = generate_regional_clients(self.service, audit_info) + self.directories = {} + self.__threading_call__(self.__describe_directories__) + self.__threading_call__(self.__list_log_subscriptions__) + self.__threading_call__(self.__describe_event_topics__) + self.__threading_call__(self.__list_certificates__) + self.__threading_call__(self.__get_snapshot_limits__) + + def __get_session__(self): + return self.session + + def __threading_call__(self, call): + threads = [] + for regional_client in self.regional_clients.values(): + threads.append(threading.Thread(target=call, args=(regional_client,))) + for t in threads: + t.start() + for t in threads: + t.join() + + def __describe_directories__(self, regional_client): + logger.info("DirectoryService - Describing Directories...") + try: + describe_fleets_paginator = regional_client.get_paginator( + "describe_directories" + ) + for page in describe_fleets_paginator.paginate(): + for directory in page["DirectoryDescriptions"]: + directory_id = directory["DirectoryId"] + # Radius Configuration + radius_authentication_protocol = ( + directory["RadiusSettings"]["AuthenticationProtocol"] + if "RadiusSettings" in directory + else None + ) + radius_status = ( + directory["RadiusStatus"] + if "RadiusStatus" in directory + else None + ) + + self.directories[directory_id] = Directory( + name=directory_id, + region=regional_client.region, + radius_settings=RadiusSettings( + authentication_protocol=radius_authentication_protocol, + status=radius_status, + ), + ) + + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __list_log_subscriptions__(self, regional_client): + logger.info("DirectoryService - Listing Log Subscriptions...") + try: + for directory in self.directories: + list_log_subscriptions_paginator = regional_client.get_paginator( + "list_log_subscriptions" + ) + list_log_subscriptions_parameters = {"DirectoryId": directory} + log_subscriptions = [] + for page in list_log_subscriptions_paginator.paginate( + **list_log_subscriptions_parameters + ): + for log_subscription_info in page["LogSubscriptions"]: + log_subscriptions.append( + LogSubscriptions( + log_group_name=log_subscription_info["LogGroupName"], + created_date_time=log_subscription_info[ + "SubscriptionCreatedDateTime" + ], + ) + ) + self.directories[directory].log_subscriptions = log_subscriptions + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __describe_event_topics__(self, regional_client): + logger.info("DirectoryService - Describing Event Topics...") + try: + for directory in self.directories: + describe_event_topics_parameters = {"DirectoryId": directory} + event_topics = [] + describe_event_topics = regional_client.describe_event_topics( + **describe_event_topics_parameters + ) + for event_topic in describe_event_topics["EventTopics"]: + event_topics.append( + EventTopics( + topic_arn=event_topic["TopicArn"], + topic_name=event_topic["TopicName"], + status=event_topic["Status"], + created_date_time=event_topic["CreatedDateTime"], + ) + ) + self.directories[directory].event_topics = event_topics + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __list_certificates__(self, regional_client): + logger.info("DirectoryService - Listing Certificates...") + try: + for directory in self.directories: + list_certificates_paginator = regional_client.get_paginator( + "list_certificates" + ) + list_certificates_parameters = {"DirectoryId": directory} + certificates = [] + for page in list_certificates_paginator.paginate( + **list_certificates_parameters + ): + for certificate_info in page["CertificatesInfo"]: + certificates.append( + Certificate( + id=certificate_info["CertificateId"], + common_name=certificate_info["CommonName"], + state=certificate_info["State"], + expiry_date_time=certificate_info["ExpiryDateTime"], + type=certificate_info["Type"], + ) + ) + self.directories[directory].certificates = certificates + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __get_snapshot_limits__(self, regional_client): + logger.info("DirectoryService - Getting Snapshot Limits...") + try: + for directory in self.directories: + + get_snapshot_limits_parameters = {"DirectoryId": directory} + snapshot_limit = regional_client.get_snapshot_limits( + **get_snapshot_limits_parameters + ) + + self.directories[directory].snapshots_limits = SnapshotLimit( + manual_snapshots_current_count=snapshot_limit["SnapshotLimits"][ + "ManualSnapshotsCurrentCount" + ], + manual_snapshots_limit=snapshot_limit["SnapshotLimits"][ + "ManualSnapshotsLimit" + ], + manual_snapshots_limit_reached=snapshot_limit["SnapshotLimits"][ + "ManualSnapshotsLimitReached" + ], + ) + + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +class SnapshotLimit(BaseModel): + manual_snapshots_limit: int + manual_snapshots_current_count: int + manual_snapshots_limit_reached: bool + + +class LogSubscriptions(BaseModel): + log_group_name: str + created_date_time: datetime + + +class EventTopicStatus(Enum): + Registered = "Registered" + NotFound = "Topic not found" + Failed = "Failed" + Delete = "Deleted" + + +class EventTopics(BaseModel): + topic_name: str + topic_arn: str + status: EventTopicStatus + created_date_time: datetime + + +class CertificateType(Enum): + ClientCertAuth = "ClientCertAuth" + ClientLDAPS = "ClientLDAPS" + + +class CertificateState(Enum): + Registering = "Registering" + Registered = "Registered" + RegisterFailed = "RegisterFailed" + Deregistering = "Deregistering" + Deregistered = "Deregistered" + DeregisterFailed = "DeregisterFailed" + + +class Certificate(BaseModel): + id: str + common_name: str + state: CertificateState + expiry_date_time: datetime + type: CertificateType + + +class AuthenticationProtocol(Enum): + PAP = "PAP" + CHAP = "CHAP" + MS_CHAPv1 = "MS-CHAPv1" + MS_CHAPv2 = "MS-CHAPv2" + + +class RadiusStatus(Enum): + """Status of the RADIUS MFA server connection""" + + Creating = "Creating" + Completed = "Completed" + Failed = "Failed" + + +class RadiusSettings(BaseModel): + authentication_protocol: Union[AuthenticationProtocol, None] + status: Union[RadiusStatus, None] + + +class Directory(BaseModel): + name: str + log_subscriptions: list[LogSubscriptions] = [] + event_topics: list[EventTopics] = [] + certificates: list[Certificate] = [] + snapshots_limits: SnapshotLimit = None + radius_settings: RadiusSettings = None + region: str diff --git a/providers/aws/services/directoryservice/directoryservice_service_test.py b/providers/aws/services/directoryservice/directoryservice_service_test.py new file mode 100644 index 00000000..a85ff583 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_service_test.py @@ -0,0 +1,225 @@ +from datetime import datetime +from unittest.mock import patch + +import botocore +from moto import mock_ds +from moto.core import DEFAULT_ACCOUNT_ID + +from providers.aws.lib.audit_info.audit_info import current_audit_info +from providers.aws.services.directoryservice.directoryservice_service import ( + AuthenticationProtocol, + CertificateState, + CertificateType, + DirectoryService, + EventTopicStatus, + RadiusStatus, +) + +# Mock Test Region +AWS_REGION = "eu-west-1" + +# Mocking Access Analyzer Calls +make_api_call = botocore.client.BaseClient._make_api_call + + +def mock_make_api_call(self, operation_name, kwarg): + """We have to mock every AWS API call using Boto3""" + if operation_name == "DescribeDirectories": + return { + "DirectoryDescriptions": [ + { + "DirectoryId": "test-directory", + "Name": "test-directory", + "ShortName": "test-directory", + "RadiusSettings": { + "RadiusServers": [ + "test-server", + ], + "RadiusPort": 9999, + "RadiusTimeout": 100, + "RadiusRetries": 100, + "SharedSecret": "test-shared-secret", + "AuthenticationProtocol": "MS-CHAPv2", + "DisplayLabel": "test-directory", + "UseSameUsername": True | False, + }, + "RadiusStatus": "Creating", + }, + ], + } + if operation_name == "ListLogSubscriptions": + return { + "LogSubscriptions": [ + { + "DirectoryId": "test-directory", + "LogGroupName": "test-log-group", + "SubscriptionCreatedDateTime": datetime(2022, 1, 1), + }, + ], + } + if operation_name == "DescribeEventTopics": + return { + "EventTopics": [ + { + "DirectoryId": "test-directory", + "TopicName": "test-topic", + "TopicArn": f"arn:aws:sns:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:test-topic", + "CreatedDateTime": datetime(2022, 1, 1), + "Status": "Registered", + }, + ] + } + + if operation_name == "ListCertificates": + return { + "CertificatesInfo": [ + { + "CertificateId": "test-certificate", + "CommonName": "test-certificate", + "State": "Registered", + "ExpiryDateTime": datetime(2023, 1, 1), + "Type": "ClientLDAPS", + }, + ] + } + if operation_name == "GetSnapshotLimits": + return { + "SnapshotLimits": { + "ManualSnapshotsLimit": 123, + "ManualSnapshotsCurrentCount": 123, + "ManualSnapshotsLimitReached": True, + } + } + + return make_api_call(self, operation_name, kwarg) + + +# Mock generate_regional_clients() +def mock_generate_regional_clients(service, audit_info): + regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) + regional_client.region = AWS_REGION + return {AWS_REGION: regional_client} + + +# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +@patch( + "providers.aws.services.directoryservice.directoryservice_service.generate_regional_clients", + new=mock_generate_regional_clients, +) +class Test_DirectoryService_Service: + # Test DirectoryService Client + @mock_ds + def test__get_client__(self): + directoryservice = DirectoryService(current_audit_info) + assert ( + directoryservice.regional_clients[AWS_REGION].__class__.__name__ + == "DirectoryService" + ) + + # Test DirectoryService Session + @mock_ds + def test__get_session__(self): + directoryservice = DirectoryService(current_audit_info) + assert directoryservice.session.__class__.__name__ == "Session" + + # Test DirectoryService Service + @mock_ds + def test__get_service__(self): + directoryservice = DirectoryService(current_audit_info) + assert directoryservice.service == "ds" + + def test__describe_directories__(self): + # Set partition for the service + current_audit_info.audited_partition = "aws" + directoryservice = DirectoryService(current_audit_info) + + # __describe_directories__ + assert directoryservice.directories["test-directory"] + assert directoryservice.directories["test-directory"].name == "test-directory" + assert directoryservice.directories["test-directory"].region == AWS_REGION + assert ( + directoryservice.directories[ + "test-directory" + ].radius_settings.authentication_protocol + == AuthenticationProtocol.MS_CHAPv2 + ) + assert ( + directoryservice.directories["test-directory"].radius_settings.status + == RadiusStatus.Creating + ) + + # __list_log_subscriptions__ + assert ( + len(directoryservice.directories["test-directory"].log_subscriptions) == 1 + ) + assert ( + directoryservice.directories["test-directory"] + .log_subscriptions[0] + .log_group_name + == "test-log-group" + ) + assert directoryservice.directories["test-directory"].log_subscriptions[ + 0 + ].created_date_time == datetime(2022, 1, 1) + + # __describe_event_topics__ + assert len(directoryservice.directories["test-directory"].event_topics) == 1 + assert ( + directoryservice.directories["test-directory"].event_topics[0].topic_name + == "test-topic" + ) + assert ( + directoryservice.directories["test-directory"].event_topics[0].topic_arn + == f"arn:aws:sns:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:test-topic" + ) + assert ( + directoryservice.directories["test-directory"].event_topics[0].status + == EventTopicStatus.Registered + ) + assert directoryservice.directories["test-directory"].event_topics[ + 0 + ].created_date_time == datetime(2022, 1, 1) + + # __list_certificates__ + assert len(directoryservice.directories["test-directory"].certificates) == 1 + assert ( + directoryservice.directories["test-directory"].certificates[0].id + == "test-certificate" + ) + assert ( + directoryservice.directories["test-directory"].certificates[0].common_name + == "test-certificate" + ) + assert ( + directoryservice.directories["test-directory"].certificates[0].state + == CertificateState.Registered + ) + assert directoryservice.directories["test-directory"].certificates[ + 0 + ].expiry_date_time == datetime(2023, 1, 1) + assert ( + directoryservice.directories["test-directory"].certificates[0].type + == CertificateType.ClientLDAPS + ) + + # __get_snapshot_limits__ + assert directoryservice.directories["test-directory"].snapshots_limits + assert ( + directoryservice.directories[ + "test-directory" + ].snapshots_limits.manual_snapshots_limit + == 123 + ) + assert ( + directoryservice.directories[ + "test-directory" + ].snapshots_limits.manual_snapshots_current_count + == 123 + ) + assert ( + directoryservice.directories[ + "test-directory" + ].snapshots_limits.manual_snapshots_limit_reached + is True + ) diff --git a/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/__init__.py b/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/directoryservice_supported_mfa_radius_enabled.metadata.json b/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/directoryservice_supported_mfa_radius_enabled.metadata.json new file mode 100644 index 00000000..d3fb4061 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/directoryservice_supported_mfa_radius_enabled.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "directoryservice_supported_mfa_radius_enabled", + "CheckTitle": "Ensure Multi-Factor Authentication (MFA) using Radius Server is enabled in DS.", + "CheckType": [], + "ServiceName": "directoryservice", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id", + "Severity": "medium", + "ResourceType": "AwsDirectoryService", + "Description": "Ensure Multi-Factor Authentication (MFA) using Radius Server is enabled in DS.", + "Risk": "Multi-Factor Authentication (MFA) adds an extra layer of authentication assurance beyond traditional username and password.", + "RelatedUrl": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_mfa.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Enabling MFA provides increased security to a user name and password as it requires the user to possess a solution that displays a time-sensitive authentication code.", + "Url": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_mfa.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/directoryservice_supported_mfa_radius_enabled.py b/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/directoryservice_supported_mfa_radius_enabled.py new file mode 100644 index 00000000..bcfca012 --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/directoryservice_supported_mfa_radius_enabled.py @@ -0,0 +1,31 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.directoryservice.directoryservice_client import ( + directoryservice_client, +) +from providers.aws.services.directoryservice.directoryservice_service import ( + RadiusStatus, +) + + +class directoryservice_supported_mfa_radius_enabled(Check): + def execute(self): + findings = [] + for directory in directoryservice_client.directories.values(): + if directory.radius_settings: + report = Check_Report(self.metadata) + report.region = directory.region + report.resource_id = directory.name + if directory.radius_settings.status == RadiusStatus.Completed: + report.status = "PASS" + report.status_extended = ( + f"Directory {directory.name} have Radius MFA enabled" + ) + else: + report.status = "FAIL" + report.status_extended = ( + f"Directory {directory.name} does not have Radius MFA enabled" + ) + + findings.append(report) + + return findings diff --git a/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/directoryservice_supported_mfa_radius_enabled_test.py b/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/directoryservice_supported_mfa_radius_enabled_test.py new file mode 100644 index 00000000..3e98bc9c --- /dev/null +++ b/providers/aws/services/directoryservice/directoryservice_supported_mfa_radius_enabled/directoryservice_supported_mfa_radius_enabled_test.py @@ -0,0 +1,155 @@ +from unittest import mock + +from providers.aws.services.directoryservice.directoryservice_service import ( + AuthenticationProtocol, + Directory, + RadiusSettings, + RadiusStatus, +) + +AWS_REGION = "eu-west-1" + + +class Test_directoryservice_supported_mfa_radius_enabled: + def test_no_directories(self): + directoryservice_client = mock.MagicMock + directoryservice_client.directories = {} + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_supported_mfa_radius_enabled.directoryservice_supported_mfa_radius_enabled import ( + directoryservice_supported_mfa_radius_enabled, + ) + + check = directoryservice_supported_mfa_radius_enabled() + result = check.execute() + + assert len(result) == 0 + + def test_directory_no_radius_server(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + radius_settings=None, + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_supported_mfa_radius_enabled.directoryservice_supported_mfa_radius_enabled import ( + directoryservice_supported_mfa_radius_enabled, + ) + + check = directoryservice_supported_mfa_radius_enabled() + result = check.execute() + + assert len(result) == 0 + + def test_directory_radius_server_status_failed(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + radius_settings=RadiusSettings( + authentication_protocol=AuthenticationProtocol.MS_CHAPv1, + status=RadiusStatus.Failed, + ), + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_supported_mfa_radius_enabled.directoryservice_supported_mfa_radius_enabled import ( + directoryservice_supported_mfa_radius_enabled, + ) + + check = directoryservice_supported_mfa_radius_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == directory_name + assert result[0].region == AWS_REGION + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Directory {directory_name} does not have Radius MFA enabled" + ) + + def test_directory_radius_server_status_creating(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + radius_settings=RadiusSettings( + authentication_protocol=AuthenticationProtocol.MS_CHAPv2, + status=RadiusStatus.Creating, + ), + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_supported_mfa_radius_enabled.directoryservice_supported_mfa_radius_enabled import ( + directoryservice_supported_mfa_radius_enabled, + ) + + check = directoryservice_supported_mfa_radius_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == directory_name + assert result[0].region == AWS_REGION + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Directory {directory_name} does not have Radius MFA enabled" + ) + + def test_directory_radius_server_status_completed(self): + directoryservice_client = mock.MagicMock + directory_name = "test-directory" + directoryservice_client.directories = { + directory_name: Directory( + name=directory_name, + region=AWS_REGION, + radius_settings=RadiusSettings( + authentication_protocol=AuthenticationProtocol.MS_CHAPv2, + status=RadiusStatus.Completed, + ), + ) + } + with mock.patch( + "providers.aws.services.directoryservice.directoryservice_service.DirectoryService", + new=directoryservice_client, + ): + # Test Check + from providers.aws.services.directoryservice.directoryservice_supported_mfa_radius_enabled.directoryservice_supported_mfa_radius_enabled import ( + directoryservice_supported_mfa_radius_enabled, + ) + + check = directoryservice_supported_mfa_radius_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].resource_id == directory_name + assert result[0].region == AWS_REGION + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Directory {directory_name} have Radius MFA enabled" + )