diff --git a/providers/aws/services/workspaces/__init__.py b/providers/aws/services/workspaces/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/workspaces/workspaces_client.py b/providers/aws/services/workspaces/workspaces_client.py new file mode 100644 index 00000000..530d8dab --- /dev/null +++ b/providers/aws/services/workspaces/workspaces_client.py @@ -0,0 +1,4 @@ +from providers.aws.lib.audit_info.audit_info import current_audit_info +from providers.aws.services.workspaces.workspaces_service import WorkSpaces + +workspaces_client = WorkSpaces(current_audit_info) diff --git a/providers/aws/services/workspaces/workspaces_service.py b/providers/aws/services/workspaces/workspaces_service.py new file mode 100644 index 00000000..4ad79cd7 --- /dev/null +++ b/providers/aws/services/workspaces/workspaces_service.py @@ -0,0 +1,64 @@ +import threading + +from pydantic import BaseModel + +from lib.logger import logger +from providers.aws.aws_provider import generate_regional_clients + + +################################ WorkSpaces +class WorkSpaces: + def __init__(self, audit_info): + self.service = "workspaces" + self.session = audit_info.audit_session + self.regional_clients = generate_regional_clients(self.service, audit_info) + self.workspaces = [] + self.__threading_call__(self.__describe_workspaces__) + + def __get_session__(self): + return self.session + + def __threading_call__(self, call): + threads = [] + for regional_client in self.regional_clients.values(): + threads.append(threading.Thread(target=call, args=(regional_client,))) + for t in threads: + t.start() + for t in threads: + t.join() + + def __describe_workspaces__(self, regional_client): + logger.info("WorkSpaces - describing workspaces...") + try: + describe_workspaces_paginator = regional_client.get_paginator( + "describe_workspaces" + ) + for page in describe_workspaces_paginator.paginate(): + for workspace in page["Workspaces"]: + workspace_to_append = WorkSpace( + id=workspace["WorkspaceId"], region=regional_client.region + ) + if ( + "UserVolumeEncryptionEnabled" in workspace + and workspace["UserVolumeEncryptionEnabled"] + ): + workspace_to_append.user_volume_encryption_enabled = True + if ( + "RootVolumeEncryptionEnabled" in workspace + and workspace["RootVolumeEncryptionEnabled"] + ): + workspace_to_append.root_volume_encryption_enabled = True + self.workspaces.append(workspace_to_append) + + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +class WorkSpace(BaseModel): + id: str + arn: str = "" + region: str + user_volume_encryption_enabled: bool = None + root_volume_encryption_enabled: bool = None diff --git a/providers/aws/services/workspaces/workspaces_service_test.py b/providers/aws/services/workspaces/workspaces_service_test.py new file mode 100644 index 00000000..f85448ce --- /dev/null +++ b/providers/aws/services/workspaces/workspaces_service_test.py @@ -0,0 +1,93 @@ +from unittest.mock import patch +from uuid import uuid4 + +import botocore +from boto3 import session + +from providers.aws.lib.audit_info.models import AWS_Audit_Info +from providers.aws.services.workspaces.workspaces_service import WorkSpaces + +AWS_ACCOUNT_NUMBER = 123456789012 +AWS_REGION = "eu-west-1" + + +workspace_id = str(uuid4()) + +make_api_call = botocore.client.BaseClient._make_api_call + + +def mock_make_api_call(self, operation_name, kwarg): + if operation_name == "DescribeWorkspaces": + return { + "Workspaces": [ + { + "WorkspaceId": workspace_id, + "UserVolumeEncryptionEnabled": True, + "RootVolumeEncryptionEnabled": True, + }, + ], + } + return make_api_call(self, operation_name, kwarg) + + +def mock_generate_regional_clients(service, audit_info): + regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) + regional_client.region = AWS_REGION + return {AWS_REGION: regional_client} + + +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +@patch( + "providers.aws.services.workspaces.workspaces_service.generate_regional_clients", + new=mock_generate_regional_clients, +) +class Test_WorkSpaces_Service: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + ) + return audit_info + + # Test WorkSpaces Service + def test_service(self): + audit_info = self.set_mocked_audit_info() + workspaces = WorkSpaces(audit_info) + assert workspaces.service == "workspaces" + + # Test WorkSpaces client + def test_client(self): + audit_info = self.set_mocked_audit_info() + workspaces = WorkSpaces(audit_info) + for reg_client in workspaces.regional_clients.values(): + assert reg_client.__class__.__name__ == "WorkSpaces" + + # Test WorkSpaces session + def test__get_session__(self): + audit_info = self.set_mocked_audit_info() + workspaces = WorkSpaces(audit_info) + assert workspaces.session.__class__.__name__ == "Session" + + # Test WorkSpaces describe workspaces + def test__describe_workspaces__(self): + audit_info = self.set_mocked_audit_info() + workspaces = WorkSpaces(audit_info) + assert len(workspaces.workspaces) == 1 + assert workspaces.workspaces[0].id == workspace_id + assert workspaces.workspaces[0].region == AWS_REGION + assert workspaces.workspaces[0].user_volume_encryption_enabled + assert workspaces.workspaces[0].root_volume_encryption_enabled diff --git a/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/__init__.py b/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/workspaces_volume_encryption_enabled.metadata.json b/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/workspaces_volume_encryption_enabled.metadata.json new file mode 100644 index 00000000..e8b37123 --- /dev/null +++ b/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/workspaces_volume_encryption_enabled.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "workspaces_volume_encryption_enabled", + "CheckTitle": "Ensure that your Amazon WorkSpaces storage volumes are encrypted in order to meet security and compliance requirements", + "CheckType": [], + "ServiceName": "workspaces", + "SubServiceName": "", + "ResourceIdTemplate": "arn:aws:workspaces:region:account-id:workspace", + "Severity": "high", + "ResourceType": "AwsWorkspaces", + "Description": "Ensure that your Amazon WorkSpaces storage volumes are encrypted in order to meet security and compliance requirements", + "Risk": "If the value listed in the Volume Encryption column is Disabled the selected AWS WorkSpaces instance volumes (root and user volumes) are not encrypted. Therefore your data-at-rest is not protected from unauthorized access and does not meet the compliance requirements regarding data encryption.", + "RelatedUrl": "https://docs.aws.amazon.com/workspaces/latest/adminguide/encrypt-workspaces.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "https://docs.bridgecrew.io/docs/ensure-that-workspace-root-volumes-are-encrypted#cloudformation", + "Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/WorkSpaces/storage-encryption.html", + "Terraform": "https://docs.bridgecrew.io/docs/ensure-that-workspace-root-volumes-are-encrypted#terraform" + }, + "Recommendation": { + "Text": "WorkSpaces is integrated with the AWS Key Management Service (AWS KMS). This enables you to encrypt storage volumes of WorkSpaces using AWS KMS Key. When you launch a WorkSpace you can encrypt the root volume (for Microsoft Windows - the C drive; for Linux - /) and the user volume (for Windows - the D drive; for Linux - /home). Doing so ensures that the data stored at rest - disk I/O to the volume - and snapshots created from the volumes are all encrypted", + "Url": "https://docs.aws.amazon.com/workspaces/latest/adminguide/encrypt-workspaces.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] + } diff --git a/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/workspaces_volume_encryption_enabled.py b/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/workspaces_volume_encryption_enabled.py new file mode 100644 index 00000000..93ae070b --- /dev/null +++ b/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/workspaces_volume_encryption_enabled.py @@ -0,0 +1,33 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.workspaces.workspaces_client import workspaces_client + + +class workspaces_volume_encryption_enabled(Check): + def execute(self): + findings = [] + for workspace in workspaces_client.workspaces: + report = Check_Report(self.metadata) + report.region = workspace.region + report.resource_id = workspace.id + report.resource_arn = workspace.arn + report.status = "PASS" + report.status_extended = f"WorkSpaces workspace {workspace.id} without root or user unencrypted volumes" + if not workspace.user_volume_encryption_enabled: + report.status = "FAIL" + report.status_extended = ( + f"WorkSpaces workspace {workspace.id} with user unencrypted volumes" + ) + if not workspace.root_volume_encryption_enabled: + report.status = "FAIL" + report.status_extended = ( + f"WorkSpaces workspace {workspace.id} with root unencrypted volumes" + ) + if ( + not workspace.root_volume_encryption_enabled + and not workspace.user_volume_encryption_enabled + ): + report.status = "FAIL" + report.status_extended = f"WorkSpaces workspace {workspace.id} with root and user unencrypted volumes" + + findings.append(report) + return findings diff --git a/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/workspaces_volume_encryption_enabled_test.py b/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/workspaces_volume_encryption_enabled_test.py new file mode 100644 index 00000000..d144479d --- /dev/null +++ b/providers/aws/services/workspaces/workspaces_volume_encryption_enabled/workspaces_volume_encryption_enabled_test.py @@ -0,0 +1,139 @@ +from re import search +from unittest import mock +from uuid import uuid4 + +from providers.aws.services.workspaces.workspaces_service import WorkSpace + +AWS_REGION = "eu-west-1" +AWS_ACCOUNT_NUMBER = "123456789012" + +workspace_id = str(uuid4()) + + +class Test_workspaces_volume_encryption_enabled: + def test_no_workspaces(self): + workspaces_client = mock.MagicMock + workspaces_client.workspaces = [] + with mock.patch( + "providers.aws.services.workspaces.workspaces_service.WorkSpaces", + workspaces_client, + ): + from providers.aws.services.workspaces.workspaces_volume_encryption_enabled.workspaces_volume_encryption_enabled import ( + workspaces_volume_encryption_enabled, + ) + + check = workspaces_volume_encryption_enabled() + result = check.execute() + assert len(result) == 0 + + def test_workspaces_encrypted(self): + workspaces_client = mock.MagicMock + workspaces_client.workspaces = [] + workspaces_client.workspaces.append( + WorkSpace( + id=workspace_id, + region=AWS_REGION, + user_volume_encryption_enabled=True, + root_volume_encryption_enabled=True, + ) + ) + with mock.patch( + "providers.aws.services.workspaces.workspaces_service.WorkSpaces", + workspaces_client, + ): + from providers.aws.services.workspaces.workspaces_volume_encryption_enabled.workspaces_volume_encryption_enabled import ( + workspaces_volume_encryption_enabled, + ) + + check = workspaces_volume_encryption_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "without root or user unencrypted volumes", result[0].status_extended + ) + assert result[0].resource_id == workspace_id + assert result[0].resource_arn == "" + + def test_workspaces_user_not_encrypted(self): + workspaces_client = mock.MagicMock + workspaces_client.workspaces = [] + workspaces_client.workspaces.append( + WorkSpace( + id=workspace_id, + region=AWS_REGION, + user_volume_encryption_enabled=False, + root_volume_encryption_enabled=True, + ) + ) + with mock.patch( + "providers.aws.services.workspaces.workspaces_service.WorkSpaces", + workspaces_client, + ): + from providers.aws.services.workspaces.workspaces_volume_encryption_enabled.workspaces_volume_encryption_enabled import ( + workspaces_volume_encryption_enabled, + ) + + check = workspaces_volume_encryption_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search("user unencrypted volumes", result[0].status_extended) + assert result[0].resource_id == workspace_id + assert result[0].resource_arn == "" + + def test_workspaces_root_not_encrypted(self): + workspaces_client = mock.MagicMock + workspaces_client.workspaces = [] + workspaces_client.workspaces.append( + WorkSpace( + id=workspace_id, + region=AWS_REGION, + user_volume_encryption_enabled=True, + root_volume_encryption_enabled=False, + ) + ) + with mock.patch( + "providers.aws.services.workspaces.workspaces_service.WorkSpaces", + workspaces_client, + ): + from providers.aws.services.workspaces.workspaces_volume_encryption_enabled.workspaces_volume_encryption_enabled import ( + workspaces_volume_encryption_enabled, + ) + + check = workspaces_volume_encryption_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search("root unencrypted volumes", result[0].status_extended) + assert result[0].resource_id == workspace_id + assert result[0].resource_arn == "" + + def test_workspaces_user_and_root_not_encrypted(self): + workspaces_client = mock.MagicMock + workspaces_client.workspaces = [] + workspaces_client.workspaces.append( + WorkSpace( + id=workspace_id, + region=AWS_REGION, + user_volume_encryption_enabled=False, + root_volume_encryption_enabled=False, + ) + ) + with mock.patch( + "providers.aws.services.workspaces.workspaces_service.WorkSpaces", + workspaces_client, + ): + from providers.aws.services.workspaces.workspaces_volume_encryption_enabled.workspaces_volume_encryption_enabled import ( + workspaces_volume_encryption_enabled, + ) + + check = workspaces_volume_encryption_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "with root and user unencrypted volumes", result[0].status_extended + ) + assert result[0].resource_id == workspace_id + assert result[0].resource_arn == ""