feat(directoryservice): Service and checks (#1477)

This commit is contained in:
Pepe Fagoaga
2022-11-17 11:16:36 +01:00
committed by GitHub
parent 684b7fe0b8
commit d2d2c75967
32 changed files with 1743 additions and 25 deletions

View File

@@ -26,6 +26,7 @@ pytest = "7.1.2"
pytest-xdist = "2.5.0"
coverage = "6.4.1"
sure = "2.0.0"
freezegun = "1.2.1"
[requires]
python_version = "3.9"

56
Pipfile.lock generated
View File

@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "4e3096e19f235b38d957c39849b152deb4a0ffaa4af22dfcb49ec6446b190918"
"sha256": "881edb3306efd59b84c75bd2bff5acbc29397eb7f12321203c62a013f0491e2e"
},
"pipfile-spec": 6,
"requires": {
@@ -26,26 +26,26 @@
},
"boto3": {
"hashes": [
"sha256:b39303fdda9b5d77a152e3ec9f264ae318ccdaa853eaf694626dc335464ded98",
"sha256:c02fc93a926944b4b426a170d2dae274b4c8c09ec5259450b94269a8ce990dd7"
"sha256:15b059251990706c5d5a556c42a6e6e781a51edadf48a42afdd06bd313adfaf2",
"sha256:27efa5eb229364bc4643d9e182de0891f73c21b65ef80b5bd02a977caeaf595c"
],
"index": "pypi",
"version": "==1.26.8"
"version": "==1.26.9"
},
"botocore": {
"hashes": [
"sha256:48cf33d7c513320711321c3b303b0c9810b23e15fa03424f7323883e4ce6cef8",
"sha256:9c6adcf4e080be63b92f50d01e176ef2d1d2a3da7d8387a964abb9eb65fc8aad"
"sha256:01f168e2418419a6d8b335ecc4330faa6a7332d5a097029fc9b4a3ae3c41cea2",
"sha256:c876a7c6a07d7f86f46a8cc765f0d38999b84ac1d7b0666d6ce08c0503e13145"
],
"index": "pypi",
"version": "==1.29.8"
"version": "==1.29.9"
},
"certifi": {
"hashes": [
"sha256:0d9c601124e5a6ba9712dbc60d9c53c21e34f5f641fe83002317394311bdce14",
"sha256:90c1a32f1d68f940488354e36370f6cca89f0f106db09518524c88d6ed83f382"
],
"markers": "python_version >= '3.6'",
"markers": "python_full_version >= '3.6.0'",
"version": "==2022.9.24"
},
"charset-normalizer": {
@@ -196,7 +196,7 @@
"sha256:e61ceaab6f49fb8bdfaa0f92c4b57bcfbea54c09277b1b4f7ac376bfb7a7c174",
"sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"
],
"markers": "python_version >= '3.6'",
"markers": "python_full_version >= '3.6.0'",
"version": "==6.0"
},
"requests": {
@@ -309,26 +309,26 @@
},
"boto3": {
"hashes": [
"sha256:b39303fdda9b5d77a152e3ec9f264ae318ccdaa853eaf694626dc335464ded98",
"sha256:c02fc93a926944b4b426a170d2dae274b4c8c09ec5259450b94269a8ce990dd7"
"sha256:15b059251990706c5d5a556c42a6e6e781a51edadf48a42afdd06bd313adfaf2",
"sha256:27efa5eb229364bc4643d9e182de0891f73c21b65ef80b5bd02a977caeaf595c"
],
"index": "pypi",
"version": "==1.26.8"
"version": "==1.26.9"
},
"botocore": {
"hashes": [
"sha256:48cf33d7c513320711321c3b303b0c9810b23e15fa03424f7323883e4ce6cef8",
"sha256:9c6adcf4e080be63b92f50d01e176ef2d1d2a3da7d8387a964abb9eb65fc8aad"
"sha256:01f168e2418419a6d8b335ecc4330faa6a7332d5a097029fc9b4a3ae3c41cea2",
"sha256:c876a7c6a07d7f86f46a8cc765f0d38999b84ac1d7b0666d6ce08c0503e13145"
],
"index": "pypi",
"version": "==1.29.8"
"version": "==1.29.9"
},
"certifi": {
"hashes": [
"sha256:0d9c601124e5a6ba9712dbc60d9c53c21e34f5f641fe83002317394311bdce14",
"sha256:90c1a32f1d68f940488354e36370f6cca89f0f106db09518524c88d6ed83f382"
],
"markers": "python_version >= '3.6'",
"markers": "python_full_version >= '3.6.0'",
"version": "==2022.9.24"
},
"cffi": {
@@ -530,11 +530,11 @@
},
"exceptiongroup": {
"hashes": [
"sha256:a31cd183c3dea02e617aab5153588d5f7258a77b51f0ef41b3815ae8a0d0f695",
"sha256:c22f11ec6a10d2b453871c5c5fe887436c4d1961324ce9090f2ca6ddc4180c27"
"sha256:6002703c7d31fb9950ddc8780840f67880c440895dc1151dd551553aa1246e4a",
"sha256:76cac74b5207c5997678a1c7105cb6f14213c9c63c096a38cfcb529d83ce5c02"
],
"markers": "python_version < '3.11'",
"version": "==1.0.2"
"version": "==1.0.3"
},
"execnet": {
"hashes": [
@@ -552,6 +552,14 @@
"index": "pypi",
"version": "==5.0.4"
},
"freezegun": {
"hashes": [
"sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446",
"sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f"
],
"index": "pypi",
"version": "==1.2.2"
},
"gitdb": {
"hashes": [
"sha256:8033ad4e853066ba6ca92050b9df2f89301b8fc8bf7e9324d412a63f8bf1a8fd",
@@ -596,7 +604,7 @@
"sha256:6f62d78e2f89b4500b080fe3a81690850cd254227f27f75c3a0c491a1f351ba7",
"sha256:e8443a5e7a020e9d7f97f1d7d9cd17c88bcb3bc7e218bf9cf5095fe550be2951"
],
"markers": "python_version < '4.0' and python_full_version >= '3.6.1'",
"markers": "python_full_version >= '3.6.1' and python_full_version < '4.0.0'",
"version": "==5.10.1"
},
"jinja2": {
@@ -628,7 +636,7 @@
"sha256:1e525177574c23ae0f55cd62382632a083a0339928f0ca846a975a4da9851cec",
"sha256:780a22d517cdc857d9714a80d8349c546945063f20853ea32ba7f85bc643ec7d"
],
"markers": "python_full_version >= '3.7.0' and python_full_version < '4.0.0'",
"markers": "python_version >= '3.7' and python_full_version < '4.0.0'",
"version": "==0.1.2"
},
"lazy-object-proxy": {
@@ -738,7 +746,7 @@
"sha256:34fbd14b7501abe25e64d7b4624a9db02cde1a578d285b3da6f34b290cdf0b3a",
"sha256:7cf27585dd7970b7257cefe48e1a3a10d4e34421831bdb472d96967433bc27bd"
],
"markers": "python_full_version >= '3.7.0' and python_full_version < '4.0.0'",
"markers": "python_version >= '3.7' and python_full_version < '4.0.0'",
"version": "==0.3.4"
},
"openapi-spec-validator": {
@@ -762,7 +770,7 @@
"sha256:5c869d315be50776cc8a993f3af43e0c60dc01506b399643f919034ebf4cdcab",
"sha256:cdd7b1f9d7d5c8b8d3315dbf5a86b2596053ae845f056f57d97c0eefff84da14"
],
"markers": "python_full_version >= '3.7.0' and python_full_version < '4.0.0'",
"markers": "python_version >= '3.7' and python_full_version < '4.0.0'",
"version": "==0.4.3"
},
"pathspec": {
@@ -938,7 +946,7 @@
"sha256:e61ceaab6f49fb8bdfaa0f92c4b57bcfbea54c09277b1b4f7ac376bfb7a7c174",
"sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"
],
"markers": "python_version >= '3.6'",
"markers": "python_full_version >= '3.6.0'",
"version": "==6.0"
},
"requests": {

View File

@@ -17,7 +17,7 @@ from providers.aws.services.codeartifact.codeartifact_service import (
AWS_REGION = "eu-west-1"
class Test_accessanalyzer_enabled_without_findings:
class Test_codeartifact_packages_external_public_publishing_disabled:
def test_no_repositories(self):
codeartifact_client = mock.MagicMock
codeartifact_client.repositories = {}

View File

@@ -0,0 +1,6 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.directoryservice.directoryservice_service import (
DirectoryService,
)
directoryservice_client = DirectoryService(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "directoryservice_directory_log_forwarding_enabled",
"CheckTitle": "Directory Service monitoring with CloudWatch logs.",
"CheckType": [],
"ServiceName": "directoryservice",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id",
"Severity": "medium",
"ResourceType": "AwsDirectoryService",
"Description": "Directory Service monitoring with CloudWatch logs.",
"Risk": "As a best practice, monitor your organization to ensure that changes are logged. This helps you to ensure that any unexpected change can be investigated and unwanted changes can be rolled back.",
"RelatedUrl": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/incident-response.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "It is recommended that that the export of logs is enabled.",
"Url": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/incident-response.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,23 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.directoryservice.directoryservice_client import (
directoryservice_client,
)
class directoryservice_directory_log_forwarding_enabled(Check):
def execute(self):
findings = []
for directory in directoryservice_client.directories.values():
report = Check_Report(self.metadata)
report.region = directory.region
report.resource_id = directory.name
if directory.log_subscriptions:
report.status = "PASS"
report.status_extended = f"Directory Service {directory.name} have log forwarding to CloudWatch enabled"
else:
report.status = "FAIL"
report.status_extended = f"Directory Service {directory.name} have log forwarding to CloudWatch disabled"
findings.append(report)
return findings

View File

@@ -0,0 +1,95 @@
from datetime import datetime
from unittest import mock
from providers.aws.services.directoryservice.directoryservice_service import (
Directory,
LogSubscriptions,
)
AWS_REGION = "eu-west-1"
class Test_directoryservice_directory_log_forwarding_enabled:
def test_no_directories(self):
directoryservice_client = mock.MagicMock
directoryservice_client.directories = {}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_log_forwarding_enabled.directoryservice_directory_log_forwarding_enabled import (
directoryservice_directory_log_forwarding_enabled,
)
check = directoryservice_directory_log_forwarding_enabled()
result = check.execute()
assert len(result) == 0
def test_one_directory_logging_disabled(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
log_subscriptions=[],
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_log_forwarding_enabled.directoryservice_directory_log_forwarding_enabled import (
directoryservice_directory_log_forwarding_enabled,
)
check = directoryservice_directory_log_forwarding_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == "test-directory"
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Directory Service {directory_name} have log forwarding to CloudWatch disabled"
)
def test_one_directory_logging_enabled(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
log_subscriptions=[
LogSubscriptions(
log_group_name="test-log-group",
created_date_time=datetime(2022, 1, 1),
)
],
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_log_forwarding_enabled.directoryservice_directory_log_forwarding_enabled import (
directoryservice_directory_log_forwarding_enabled,
)
check = directoryservice_directory_log_forwarding_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == "test-directory"
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Directory Service {directory_name} have log forwarding to CloudWatch enabled"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "directoryservice_directory_monitor_notifications",
"CheckTitle": "Directory Service has SNS Notifications enabled.",
"CheckType": [],
"ServiceName": "directoryservice",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id",
"Severity": "medium",
"ResourceType": "AwsDirectoryService",
"Description": "Directory Service has SNS Notifications enabled.",
"Risk": "As a best practice, monitor status of Directory Service. This helps to avoid late actions to fix Directory Service issues.",
"RelatedUrl": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_enable_notifications.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "It is recommended set up SNS messaging to send email or text messages when the status of your directory changes.",
"Url": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_enable_notifications.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,27 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.directoryservice.directoryservice_client import (
directoryservice_client,
)
class directoryservice_directory_monitor_notifications(Check):
def execute(self):
findings = []
for directory in directoryservice_client.directories.values():
report = Check_Report(self.metadata)
report.region = directory.region
report.resource_id = directory.name
if directory.event_topics:
report.status = "PASS"
report.status_extended = (
f"Directory Service {directory.name} have SNS messaging enabled"
)
else:
report.status = "FAIL"
report.status_extended = (
f"Directory Service {directory.name} have SNS messaging disabled"
)
findings.append(report)
return findings

View File

@@ -0,0 +1,100 @@
from datetime import datetime
from unittest import mock
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.services.directoryservice.directoryservice_service import (
Directory,
EventTopics,
EventTopicStatus,
)
AWS_REGION = "eu-west-1"
class Test_directoryservice_directory_monitor_notifications:
def test_no_directories(self):
directoryservice_client = mock.MagicMock
directoryservice_client.directories = {}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_monitor_notifications.directoryservice_directory_monitor_notifications import (
directoryservice_directory_monitor_notifications,
)
check = directoryservice_directory_monitor_notifications()
result = check.execute()
assert len(result) == 0
def test_one_directory_logging_disabled(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
event_topics=[],
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_monitor_notifications.directoryservice_directory_monitor_notifications import (
directoryservice_directory_monitor_notifications,
)
check = directoryservice_directory_monitor_notifications()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == "test-directory"
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Directory Service {directory_name} have SNS messaging disabled"
)
def test_one_directory_logging_enabled(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
event_topics=[
EventTopics(
topic_arn=f"arn:aws:sns:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:test-topic",
topic_name="test-topic",
status=EventTopicStatus.Registered,
created_date_time=datetime(2022, 1, 1),
)
],
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_monitor_notifications.directoryservice_directory_monitor_notifications import (
directoryservice_directory_monitor_notifications,
)
check = directoryservice_directory_monitor_notifications()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == "test-directory"
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Directory Service {directory_name} have SNS messaging enabled"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "directoryservice_directory_snapshots_limit",
"CheckTitle": "Directory Service Manual Snapshots limit reached.",
"CheckType": [],
"ServiceName": "directoryservice",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id",
"Severity": "low",
"ResourceType": "AwsDirectoryService",
"Description": "Directory Service Manual Snapshots limit reached.",
"Risk": "A limit reached can bring unwanted results. The maximum number of manual snapshots is a hard limit.",
"RelatedUrl": "https://docs.aws.amazon.com/general/latest/gr/ds_region.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Monitor manual snapshots limit to ensure capacity when you need it.",
"Url": "https://docs.aws.amazon.com/general/latest/gr/ds_region.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,33 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.directoryservice.directoryservice_client import (
directoryservice_client,
)
SNAPSHOT_LIMIT_THRESHOLD = 2
"""Number of remaining snapshots to reach the limit"""
class directoryservice_directory_snapshots_limit(Check):
def execute(self):
findings = []
for directory in directoryservice_client.directories.values():
report = Check_Report(self.metadata)
report.region = directory.region
report.resource_id = directory.name
if directory.snapshots_limits.manual_snapshots_limit_reached:
report.status = "FAIL"
report.status_extended = f"Directory Service {directory.name} reached {directory.snapshots_limits.manual_snapshots_limit} Snapshots limit"
else:
limit_remaining = (
directory.snapshots_limits.manual_snapshots_limit
- directory.snapshots_limits.manual_snapshots_current_count
)
if limit_remaining <= SNAPSHOT_LIMIT_THRESHOLD:
report.status = "FAIL"
report.status_extended = f"Directory Service {directory.name} is about to reach {directory.snapshots_limits.manual_snapshots_limit} Snapshots which is the limit"
else:
report.status = "PASS"
report.status_extended = f"Directory Service {directory.name} is using {directory.snapshots_limits.manual_snapshots_current_count} out of {directory.snapshots_limits.manual_snapshots_limit} from the Snapshots Limit"
findings.append(report)
return findings

View File

@@ -0,0 +1,179 @@
from unittest import mock
from providers.aws.services.directoryservice.directoryservice_service import (
Directory,
SnapshotLimit,
)
AWS_REGION = "eu-west-1"
class Test_directoryservice_directory_snapshots_limit:
def test_no_directories(self):
directoryservice_client = mock.MagicMock
directoryservice_client.directories = {}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_snapshots_limit.directoryservice_directory_snapshots_limit import (
directoryservice_directory_snapshots_limit,
)
check = directoryservice_directory_snapshots_limit()
result = check.execute()
assert len(result) == 0
def test_one_directory_snapshots_limit_reached(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
manual_snapshots_current_count = 5
manual_snapshots_limit = 5
manual_snapshots_limit_reached = True
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
snapshots_limits=SnapshotLimit(
manual_snapshots_current_count=manual_snapshots_current_count,
manual_snapshots_limit=manual_snapshots_limit,
manual_snapshots_limit_reached=manual_snapshots_limit_reached,
),
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_snapshots_limit.directoryservice_directory_snapshots_limit import (
directoryservice_directory_snapshots_limit,
)
check = directoryservice_directory_snapshots_limit()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == "test-directory"
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Directory Service {directory_name} reached {manual_snapshots_limit} Snapshots limit"
)
def test_one_directory_snapshots_limit_over_threshold(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
manual_snapshots_current_count = 4
manual_snapshots_limit = 5
manual_snapshots_limit_reached = False
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
snapshots_limits=SnapshotLimit(
manual_snapshots_current_count=manual_snapshots_current_count,
manual_snapshots_limit=manual_snapshots_limit,
manual_snapshots_limit_reached=manual_snapshots_limit_reached,
),
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_snapshots_limit.directoryservice_directory_snapshots_limit import (
directoryservice_directory_snapshots_limit,
)
check = directoryservice_directory_snapshots_limit()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == "test-directory"
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Directory Service {directory_name} is about to reach {manual_snapshots_limit} Snapshots which is the limit"
)
def test_one_directory_snapshots_limit_equal_threshold(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
manual_snapshots_current_count = 3
manual_snapshots_limit = 5
manual_snapshots_limit_reached = False
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
snapshots_limits=SnapshotLimit(
manual_snapshots_current_count=manual_snapshots_current_count,
manual_snapshots_limit=manual_snapshots_limit,
manual_snapshots_limit_reached=manual_snapshots_limit_reached,
),
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_snapshots_limit.directoryservice_directory_snapshots_limit import (
directoryservice_directory_snapshots_limit,
)
check = directoryservice_directory_snapshots_limit()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == "test-directory"
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Directory Service {directory_name} is about to reach {manual_snapshots_limit} Snapshots which is the limit"
)
def test_one_directory_snapshots_limit_more_threshold(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
manual_snapshots_current_count = 1
manual_snapshots_limit = 5
manual_snapshots_limit_reached = False
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
snapshots_limits=SnapshotLimit(
manual_snapshots_current_count=manual_snapshots_current_count,
manual_snapshots_limit=manual_snapshots_limit,
manual_snapshots_limit_reached=manual_snapshots_limit_reached,
),
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_directory_snapshots_limit.directoryservice_directory_snapshots_limit import (
directoryservice_directory_snapshots_limit,
)
check = directoryservice_directory_snapshots_limit()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == "test-directory"
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Directory Service {directory_name} is using {manual_snapshots_current_count} out of {manual_snapshots_limit} from the Snapshots Limit"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "directoryservice_ldap_certificate_expiration",
"CheckTitle": "Directory Service LDAP Certificates expiration.",
"CheckType": [],
"ServiceName": "directoryservice",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id",
"Severity": "medium",
"ResourceType": "AwsDirectoryService",
"Description": "Directory Service Manual Snapshots limit reached.",
"Risk": "Expired certificates can impact service availability.",
"RelatedUrl": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_ldap.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Monitor certificate expiration and take automated action to alarm responsible team for taking care of the replacement or remove.",
"Url": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_ldap.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,33 @@
from datetime import datetime
from lib.check.models import Check, Check_Report
from providers.aws.services.directoryservice.directoryservice_client import (
directoryservice_client,
)
DAYS_TO_EXPIRE_THRESHOLD = 90
"""Number of days to notify about a certificate expiration"""
class directoryservice_ldap_certificate_expiration(Check):
def execute(self):
findings = []
for directory in directoryservice_client.directories.values():
for certificate in directory.certificates:
report = Check_Report(self.metadata)
report.region = directory.region
report.resource_id = certificate.id
remaining_days_to_expire = (
certificate.expiry_date_time - datetime.today()
).days
if remaining_days_to_expire <= DAYS_TO_EXPIRE_THRESHOLD:
report.status = "FAIL"
report.status_extended = f"LDAP Certificate {certificate.id} configured at {directory.name} is about to expire in {remaining_days_to_expire} days"
else:
report.status = "PASS"
report.status_extended = f"LDAP Certificate {certificate.id} configured at {directory.name} expires in {remaining_days_to_expire} days"
findings.append(report)
return findings

View File

@@ -0,0 +1,187 @@
from datetime import datetime
from unittest import mock
from freezegun import freeze_time
from providers.aws.services.directoryservice.directoryservice_service import (
Certificate,
CertificateState,
CertificateType,
Directory,
)
AWS_REGION = "eu-west-1"
# Always use a mocked date to test the certificates expiration
@freeze_time("2023-01-01")
class Test_directoryservice_ldap_certificate_expiration:
def test_no_directories(self):
directoryservice_client = mock.MagicMock
directoryservice_client.directories = {}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_ldap_certificate_expiration.directoryservice_ldap_certificate_expiration import (
directoryservice_ldap_certificate_expiration,
)
check = directoryservice_ldap_certificate_expiration()
result = check.execute()
assert len(result) == 0
def test_directory_no_certificate(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
certificates=[],
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_ldap_certificate_expiration.directoryservice_ldap_certificate_expiration import (
directoryservice_ldap_certificate_expiration,
)
check = directoryservice_ldap_certificate_expiration()
result = check.execute()
assert len(result) == 0
def test_directory_certificate_expires_in_365_days(self):
remaining_days_to_expire = 365
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
certificate_id = "test-certificate"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
certificates=[
Certificate(
id=certificate_id,
common_name=certificate_id,
state=CertificateState.Registered,
type=CertificateType.ClientLDAPS,
expiry_date_time=datetime(2024, 1, 1),
)
],
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_ldap_certificate_expiration.directoryservice_ldap_certificate_expiration import (
directoryservice_ldap_certificate_expiration,
)
check = directoryservice_ldap_certificate_expiration()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == certificate_id
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"LDAP Certificate {certificate_id} configured at {directory_name} expires in {remaining_days_to_expire} days"
)
def test_directory_certificate_expires_in_90_days(self):
remaining_days_to_expire = 90
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
certificate_id = "test-certificate"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
certificates=[
Certificate(
id=certificate_id,
common_name=certificate_id,
state=CertificateState.Registered,
type=CertificateType.ClientLDAPS,
expiry_date_time=datetime(2023, 4, 1),
)
],
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_ldap_certificate_expiration.directoryservice_ldap_certificate_expiration import (
directoryservice_ldap_certificate_expiration,
)
check = directoryservice_ldap_certificate_expiration()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == certificate_id
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"LDAP Certificate {certificate_id} configured at {directory_name} is about to expire in {remaining_days_to_expire} days"
)
def test_directory_certificate_expires_in_31_days(self):
remaining_days_to_expire = 31
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
certificate_id = "test-certificate"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
certificates=[
Certificate(
id=certificate_id,
common_name=certificate_id,
state=CertificateState.Registered,
type=CertificateType.ClientLDAPS,
expiry_date_time=datetime(2023, 2, 1),
)
],
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_ldap_certificate_expiration.directoryservice_ldap_certificate_expiration import (
directoryservice_ldap_certificate_expiration,
)
check = directoryservice_ldap_certificate_expiration()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == certificate_id
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"LDAP Certificate {certificate_id} configured at {directory_name} is about to expire in {remaining_days_to_expire} days"
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "directoryservice_radius_server_security_protocol",
"CheckTitle": "Ensure Radius server in DS is using the recommended security protocol.",
"CheckType": [],
"ServiceName": "directoryservice",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id",
"Severity": "medium",
"ResourceType": "AwsDirectoryService",
"Description": "Ensure Radius server in DS is using the recommended security protocol.",
"Risk": "As a best practice, you might need to configure the authentication protocol between the Microsoft AD DCs and the RADIUS/MFA server. Supported protocols are PAP, CHAP MS-CHAPv1, and MS-CHAPv2. MS-CHAPv2 is recommended because it provides the strongest security of the three options.",
"RelatedUrl": "https://aws.amazon.com/blogs/security/how-to-enable-multi-factor-authentication-for-amazon-workspaces-and-amazon-quicksight-by-using-microsoft-ad-and-on-premises-credentials/",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "MS-CHAPv2 provides the strongest security of the options supported, and is therefore recommended.",
"Url": "https://aws.amazon.com/blogs/security/how-to-enable-multi-factor-authentication-for-amazon-workspaces-and-amazon-quicksight-by-using-microsoft-ad-and-on-premises-credentials/"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,30 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.directoryservice.directoryservice_client import (
directoryservice_client,
)
from providers.aws.services.directoryservice.directoryservice_service import (
AuthenticationProtocol,
)
class directoryservice_radius_server_security_protocol(Check):
def execute(self):
findings = []
for directory in directoryservice_client.directories.values():
if directory.radius_settings:
report = Check_Report(self.metadata)
report.region = directory.region
report.resource_id = directory.name
if (
directory.radius_settings.authentication_protocol
== AuthenticationProtocol.MS_CHAPv2
):
report.status = "PASS"
report.status_extended = f"Radius server of Directory {directory.name} have recommended security protocol for the Radius server"
else:
report.status = "FAIL"
report.status_extended = f"Radius server of Directory {directory.name} does not have recommended security protocol for the Radius server"
findings.append(report)
return findings

View File

@@ -0,0 +1,121 @@
from unittest import mock
from providers.aws.services.directoryservice.directoryservice_service import (
AuthenticationProtocol,
Directory,
RadiusSettings,
RadiusStatus,
)
AWS_REGION = "eu-west-1"
class Test_directoryservice_radius_server_security_protocol:
def test_no_directories(self):
directoryservice_client = mock.MagicMock
directoryservice_client.directories = {}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_radius_server_security_protocol.directoryservice_radius_server_security_protocol import (
directoryservice_radius_server_security_protocol,
)
check = directoryservice_radius_server_security_protocol()
result = check.execute()
assert len(result) == 0
def test_directory_no_radius_server(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
radius_settings=None,
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_radius_server_security_protocol.directoryservice_radius_server_security_protocol import (
directoryservice_radius_server_security_protocol,
)
check = directoryservice_radius_server_security_protocol()
result = check.execute()
assert len(result) == 0
def test_directory_radius_server_bad_auth_protocol(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
radius_settings=RadiusSettings(
authentication_protocol=AuthenticationProtocol.MS_CHAPv1,
status=RadiusStatus.Completed,
),
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_radius_server_security_protocol.directoryservice_radius_server_security_protocol import (
directoryservice_radius_server_security_protocol,
)
check = directoryservice_radius_server_security_protocol()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == directory_name
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Radius server of Directory {directory_name} does not have recommended security protocol for the Radius server"
)
def test_directory_radius_server_secure_auth_protocol(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
radius_settings=RadiusSettings(
authentication_protocol=AuthenticationProtocol.MS_CHAPv2,
status=RadiusStatus.Completed,
),
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_radius_server_security_protocol.directoryservice_radius_server_security_protocol import (
directoryservice_radius_server_security_protocol,
)
check = directoryservice_radius_server_security_protocol()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == directory_name
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Radius server of Directory {directory_name} have recommended security protocol for the Radius server"
)

View File

@@ -0,0 +1,254 @@
import threading
from datetime import datetime
from enum import Enum
from typing import Union
from pydantic import BaseModel
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## DirectoryService
class DirectoryService:
def __init__(self, audit_info):
self.service = "ds"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.directories = {}
self.__threading_call__(self.__describe_directories__)
self.__threading_call__(self.__list_log_subscriptions__)
self.__threading_call__(self.__describe_event_topics__)
self.__threading_call__(self.__list_certificates__)
self.__threading_call__(self.__get_snapshot_limits__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_directories__(self, regional_client):
logger.info("DirectoryService - Describing Directories...")
try:
describe_fleets_paginator = regional_client.get_paginator(
"describe_directories"
)
for page in describe_fleets_paginator.paginate():
for directory in page["DirectoryDescriptions"]:
directory_id = directory["DirectoryId"]
# Radius Configuration
radius_authentication_protocol = (
directory["RadiusSettings"]["AuthenticationProtocol"]
if "RadiusSettings" in directory
else None
)
radius_status = (
directory["RadiusStatus"]
if "RadiusStatus" in directory
else None
)
self.directories[directory_id] = Directory(
name=directory_id,
region=regional_client.region,
radius_settings=RadiusSettings(
authentication_protocol=radius_authentication_protocol,
status=radius_status,
),
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_log_subscriptions__(self, regional_client):
logger.info("DirectoryService - Listing Log Subscriptions...")
try:
for directory in self.directories:
list_log_subscriptions_paginator = regional_client.get_paginator(
"list_log_subscriptions"
)
list_log_subscriptions_parameters = {"DirectoryId": directory}
log_subscriptions = []
for page in list_log_subscriptions_paginator.paginate(
**list_log_subscriptions_parameters
):
for log_subscription_info in page["LogSubscriptions"]:
log_subscriptions.append(
LogSubscriptions(
log_group_name=log_subscription_info["LogGroupName"],
created_date_time=log_subscription_info[
"SubscriptionCreatedDateTime"
],
)
)
self.directories[directory].log_subscriptions = log_subscriptions
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_event_topics__(self, regional_client):
logger.info("DirectoryService - Describing Event Topics...")
try:
for directory in self.directories:
describe_event_topics_parameters = {"DirectoryId": directory}
event_topics = []
describe_event_topics = regional_client.describe_event_topics(
**describe_event_topics_parameters
)
for event_topic in describe_event_topics["EventTopics"]:
event_topics.append(
EventTopics(
topic_arn=event_topic["TopicArn"],
topic_name=event_topic["TopicName"],
status=event_topic["Status"],
created_date_time=event_topic["CreatedDateTime"],
)
)
self.directories[directory].event_topics = event_topics
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_certificates__(self, regional_client):
logger.info("DirectoryService - Listing Certificates...")
try:
for directory in self.directories:
list_certificates_paginator = regional_client.get_paginator(
"list_certificates"
)
list_certificates_parameters = {"DirectoryId": directory}
certificates = []
for page in list_certificates_paginator.paginate(
**list_certificates_parameters
):
for certificate_info in page["CertificatesInfo"]:
certificates.append(
Certificate(
id=certificate_info["CertificateId"],
common_name=certificate_info["CommonName"],
state=certificate_info["State"],
expiry_date_time=certificate_info["ExpiryDateTime"],
type=certificate_info["Type"],
)
)
self.directories[directory].certificates = certificates
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_snapshot_limits__(self, regional_client):
logger.info("DirectoryService - Getting Snapshot Limits...")
try:
for directory in self.directories:
get_snapshot_limits_parameters = {"DirectoryId": directory}
snapshot_limit = regional_client.get_snapshot_limits(
**get_snapshot_limits_parameters
)
self.directories[directory].snapshots_limits = SnapshotLimit(
manual_snapshots_current_count=snapshot_limit["SnapshotLimits"][
"ManualSnapshotsCurrentCount"
],
manual_snapshots_limit=snapshot_limit["SnapshotLimits"][
"ManualSnapshotsLimit"
],
manual_snapshots_limit_reached=snapshot_limit["SnapshotLimits"][
"ManualSnapshotsLimitReached"
],
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class SnapshotLimit(BaseModel):
manual_snapshots_limit: int
manual_snapshots_current_count: int
manual_snapshots_limit_reached: bool
class LogSubscriptions(BaseModel):
log_group_name: str
created_date_time: datetime
class EventTopicStatus(Enum):
Registered = "Registered"
NotFound = "Topic not found"
Failed = "Failed"
Delete = "Deleted"
class EventTopics(BaseModel):
topic_name: str
topic_arn: str
status: EventTopicStatus
created_date_time: datetime
class CertificateType(Enum):
ClientCertAuth = "ClientCertAuth"
ClientLDAPS = "ClientLDAPS"
class CertificateState(Enum):
Registering = "Registering"
Registered = "Registered"
RegisterFailed = "RegisterFailed"
Deregistering = "Deregistering"
Deregistered = "Deregistered"
DeregisterFailed = "DeregisterFailed"
class Certificate(BaseModel):
id: str
common_name: str
state: CertificateState
expiry_date_time: datetime
type: CertificateType
class AuthenticationProtocol(Enum):
PAP = "PAP"
CHAP = "CHAP"
MS_CHAPv1 = "MS-CHAPv1"
MS_CHAPv2 = "MS-CHAPv2"
class RadiusStatus(Enum):
"""Status of the RADIUS MFA server connection"""
Creating = "Creating"
Completed = "Completed"
Failed = "Failed"
class RadiusSettings(BaseModel):
authentication_protocol: Union[AuthenticationProtocol, None]
status: Union[RadiusStatus, None]
class Directory(BaseModel):
name: str
log_subscriptions: list[LogSubscriptions] = []
event_topics: list[EventTopics] = []
certificates: list[Certificate] = []
snapshots_limits: SnapshotLimit = None
radius_settings: RadiusSettings = None
region: str

View File

@@ -0,0 +1,225 @@
from datetime import datetime
from unittest.mock import patch
import botocore
from moto import mock_ds
from moto.core import DEFAULT_ACCOUNT_ID
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.directoryservice.directoryservice_service import (
AuthenticationProtocol,
CertificateState,
CertificateType,
DirectoryService,
EventTopicStatus,
RadiusStatus,
)
# Mock Test Region
AWS_REGION = "eu-west-1"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
"""We have to mock every AWS API call using Boto3"""
if operation_name == "DescribeDirectories":
return {
"DirectoryDescriptions": [
{
"DirectoryId": "test-directory",
"Name": "test-directory",
"ShortName": "test-directory",
"RadiusSettings": {
"RadiusServers": [
"test-server",
],
"RadiusPort": 9999,
"RadiusTimeout": 100,
"RadiusRetries": 100,
"SharedSecret": "test-shared-secret",
"AuthenticationProtocol": "MS-CHAPv2",
"DisplayLabel": "test-directory",
"UseSameUsername": True | False,
},
"RadiusStatus": "Creating",
},
],
}
if operation_name == "ListLogSubscriptions":
return {
"LogSubscriptions": [
{
"DirectoryId": "test-directory",
"LogGroupName": "test-log-group",
"SubscriptionCreatedDateTime": datetime(2022, 1, 1),
},
],
}
if operation_name == "DescribeEventTopics":
return {
"EventTopics": [
{
"DirectoryId": "test-directory",
"TopicName": "test-topic",
"TopicArn": f"arn:aws:sns:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:test-topic",
"CreatedDateTime": datetime(2022, 1, 1),
"Status": "Registered",
},
]
}
if operation_name == "ListCertificates":
return {
"CertificatesInfo": [
{
"CertificateId": "test-certificate",
"CommonName": "test-certificate",
"State": "Registered",
"ExpiryDateTime": datetime(2023, 1, 1),
"Type": "ClientLDAPS",
},
]
}
if operation_name == "GetSnapshotLimits":
return {
"SnapshotLimits": {
"ManualSnapshotsLimit": 123,
"ManualSnapshotsCurrentCount": 123,
"ManualSnapshotsLimitReached": True,
}
}
return make_api_call(self, operation_name, kwarg)
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"providers.aws.services.directoryservice.directoryservice_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_DirectoryService_Service:
# Test DirectoryService Client
@mock_ds
def test__get_client__(self):
directoryservice = DirectoryService(current_audit_info)
assert (
directoryservice.regional_clients[AWS_REGION].__class__.__name__
== "DirectoryService"
)
# Test DirectoryService Session
@mock_ds
def test__get_session__(self):
directoryservice = DirectoryService(current_audit_info)
assert directoryservice.session.__class__.__name__ == "Session"
# Test DirectoryService Service
@mock_ds
def test__get_service__(self):
directoryservice = DirectoryService(current_audit_info)
assert directoryservice.service == "ds"
def test__describe_directories__(self):
# Set partition for the service
current_audit_info.audited_partition = "aws"
directoryservice = DirectoryService(current_audit_info)
# __describe_directories__
assert directoryservice.directories["test-directory"]
assert directoryservice.directories["test-directory"].name == "test-directory"
assert directoryservice.directories["test-directory"].region == AWS_REGION
assert (
directoryservice.directories[
"test-directory"
].radius_settings.authentication_protocol
== AuthenticationProtocol.MS_CHAPv2
)
assert (
directoryservice.directories["test-directory"].radius_settings.status
== RadiusStatus.Creating
)
# __list_log_subscriptions__
assert (
len(directoryservice.directories["test-directory"].log_subscriptions) == 1
)
assert (
directoryservice.directories["test-directory"]
.log_subscriptions[0]
.log_group_name
== "test-log-group"
)
assert directoryservice.directories["test-directory"].log_subscriptions[
0
].created_date_time == datetime(2022, 1, 1)
# __describe_event_topics__
assert len(directoryservice.directories["test-directory"].event_topics) == 1
assert (
directoryservice.directories["test-directory"].event_topics[0].topic_name
== "test-topic"
)
assert (
directoryservice.directories["test-directory"].event_topics[0].topic_arn
== f"arn:aws:sns:{AWS_REGION}:{DEFAULT_ACCOUNT_ID}:test-topic"
)
assert (
directoryservice.directories["test-directory"].event_topics[0].status
== EventTopicStatus.Registered
)
assert directoryservice.directories["test-directory"].event_topics[
0
].created_date_time == datetime(2022, 1, 1)
# __list_certificates__
assert len(directoryservice.directories["test-directory"].certificates) == 1
assert (
directoryservice.directories["test-directory"].certificates[0].id
== "test-certificate"
)
assert (
directoryservice.directories["test-directory"].certificates[0].common_name
== "test-certificate"
)
assert (
directoryservice.directories["test-directory"].certificates[0].state
== CertificateState.Registered
)
assert directoryservice.directories["test-directory"].certificates[
0
].expiry_date_time == datetime(2023, 1, 1)
assert (
directoryservice.directories["test-directory"].certificates[0].type
== CertificateType.ClientLDAPS
)
# __get_snapshot_limits__
assert directoryservice.directories["test-directory"].snapshots_limits
assert (
directoryservice.directories[
"test-directory"
].snapshots_limits.manual_snapshots_limit
== 123
)
assert (
directoryservice.directories[
"test-directory"
].snapshots_limits.manual_snapshots_current_count
== 123
)
assert (
directoryservice.directories[
"test-directory"
].snapshots_limits.manual_snapshots_limit_reached
is True
)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "directoryservice_supported_mfa_radius_enabled",
"CheckTitle": "Ensure Multi-Factor Authentication (MFA) using Radius Server is enabled in DS.",
"CheckType": [],
"ServiceName": "directoryservice",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:codeartifact:region:account-id:directory/directory-id",
"Severity": "medium",
"ResourceType": "AwsDirectoryService",
"Description": "Ensure Multi-Factor Authentication (MFA) using Radius Server is enabled in DS.",
"Risk": "Multi-Factor Authentication (MFA) adds an extra layer of authentication assurance beyond traditional username and password.",
"RelatedUrl": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_mfa.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enabling MFA provides increased security to a user name and password as it requires the user to possess a solution that displays a time-sensitive authentication code.",
"Url": "https://docs.aws.amazon.com/directoryservice/latest/admin-guide/ms_ad_mfa.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,31 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.directoryservice.directoryservice_client import (
directoryservice_client,
)
from providers.aws.services.directoryservice.directoryservice_service import (
RadiusStatus,
)
class directoryservice_supported_mfa_radius_enabled(Check):
def execute(self):
findings = []
for directory in directoryservice_client.directories.values():
if directory.radius_settings:
report = Check_Report(self.metadata)
report.region = directory.region
report.resource_id = directory.name
if directory.radius_settings.status == RadiusStatus.Completed:
report.status = "PASS"
report.status_extended = (
f"Directory {directory.name} have Radius MFA enabled"
)
else:
report.status = "FAIL"
report.status_extended = (
f"Directory {directory.name} does not have Radius MFA enabled"
)
findings.append(report)
return findings

View File

@@ -0,0 +1,155 @@
from unittest import mock
from providers.aws.services.directoryservice.directoryservice_service import (
AuthenticationProtocol,
Directory,
RadiusSettings,
RadiusStatus,
)
AWS_REGION = "eu-west-1"
class Test_directoryservice_supported_mfa_radius_enabled:
def test_no_directories(self):
directoryservice_client = mock.MagicMock
directoryservice_client.directories = {}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_supported_mfa_radius_enabled.directoryservice_supported_mfa_radius_enabled import (
directoryservice_supported_mfa_radius_enabled,
)
check = directoryservice_supported_mfa_radius_enabled()
result = check.execute()
assert len(result) == 0
def test_directory_no_radius_server(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
radius_settings=None,
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_supported_mfa_radius_enabled.directoryservice_supported_mfa_radius_enabled import (
directoryservice_supported_mfa_radius_enabled,
)
check = directoryservice_supported_mfa_radius_enabled()
result = check.execute()
assert len(result) == 0
def test_directory_radius_server_status_failed(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
radius_settings=RadiusSettings(
authentication_protocol=AuthenticationProtocol.MS_CHAPv1,
status=RadiusStatus.Failed,
),
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_supported_mfa_radius_enabled.directoryservice_supported_mfa_radius_enabled import (
directoryservice_supported_mfa_radius_enabled,
)
check = directoryservice_supported_mfa_radius_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == directory_name
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Directory {directory_name} does not have Radius MFA enabled"
)
def test_directory_radius_server_status_creating(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
radius_settings=RadiusSettings(
authentication_protocol=AuthenticationProtocol.MS_CHAPv2,
status=RadiusStatus.Creating,
),
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_supported_mfa_radius_enabled.directoryservice_supported_mfa_radius_enabled import (
directoryservice_supported_mfa_radius_enabled,
)
check = directoryservice_supported_mfa_radius_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == directory_name
assert result[0].region == AWS_REGION
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Directory {directory_name} does not have Radius MFA enabled"
)
def test_directory_radius_server_status_completed(self):
directoryservice_client = mock.MagicMock
directory_name = "test-directory"
directoryservice_client.directories = {
directory_name: Directory(
name=directory_name,
region=AWS_REGION,
radius_settings=RadiusSettings(
authentication_protocol=AuthenticationProtocol.MS_CHAPv2,
status=RadiusStatus.Completed,
),
)
}
with mock.patch(
"providers.aws.services.directoryservice.directoryservice_service.DirectoryService",
new=directoryservice_client,
):
# Test Check
from providers.aws.services.directoryservice.directoryservice_supported_mfa_radius_enabled.directoryservice_supported_mfa_radius_enabled import (
directoryservice_supported_mfa_radius_enabled,
)
check = directoryservice_supported_mfa_radius_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == directory_name
assert result[0].region == AWS_REGION
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Directory {directory_name} have Radius MFA enabled"
)