feat(): workspace service and checks (#1503)

This commit is contained in:
Nacho Rivera
2022-11-17 22:59:14 +01:00
committed by GitHub
parent 7d80a9d048
commit 538496ed6b
8 changed files with 368 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.workspaces.workspaces_service import WorkSpaces
workspaces_client = WorkSpaces(current_audit_info)

View File

@@ -0,0 +1,64 @@
import threading
from pydantic import BaseModel
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################################ WorkSpaces
class WorkSpaces:
def __init__(self, audit_info):
self.service = "workspaces"
self.session = audit_info.audit_session
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.workspaces = []
self.__threading_call__(self.__describe_workspaces__)
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_workspaces__(self, regional_client):
logger.info("WorkSpaces - describing workspaces...")
try:
describe_workspaces_paginator = regional_client.get_paginator(
"describe_workspaces"
)
for page in describe_workspaces_paginator.paginate():
for workspace in page["Workspaces"]:
workspace_to_append = WorkSpace(
id=workspace["WorkspaceId"], region=regional_client.region
)
if (
"UserVolumeEncryptionEnabled" in workspace
and workspace["UserVolumeEncryptionEnabled"]
):
workspace_to_append.user_volume_encryption_enabled = True
if (
"RootVolumeEncryptionEnabled" in workspace
and workspace["RootVolumeEncryptionEnabled"]
):
workspace_to_append.root_volume_encryption_enabled = True
self.workspaces.append(workspace_to_append)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class WorkSpace(BaseModel):
id: str
arn: str = ""
region: str
user_volume_encryption_enabled: bool = None
root_volume_encryption_enabled: bool = None

View File

@@ -0,0 +1,93 @@
from unittest.mock import patch
from uuid import uuid4
import botocore
from boto3 import session
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.workspaces.workspaces_service import WorkSpaces
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "eu-west-1"
workspace_id = str(uuid4())
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeWorkspaces":
return {
"Workspaces": [
{
"WorkspaceId": workspace_id,
"UserVolumeEncryptionEnabled": True,
"RootVolumeEncryptionEnabled": True,
},
],
}
return make_api_call(self, operation_name, kwarg)
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"providers.aws.services.workspaces.workspaces_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_WorkSpaces_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test WorkSpaces Service
def test_service(self):
audit_info = self.set_mocked_audit_info()
workspaces = WorkSpaces(audit_info)
assert workspaces.service == "workspaces"
# Test WorkSpaces client
def test_client(self):
audit_info = self.set_mocked_audit_info()
workspaces = WorkSpaces(audit_info)
for reg_client in workspaces.regional_clients.values():
assert reg_client.__class__.__name__ == "WorkSpaces"
# Test WorkSpaces session
def test__get_session__(self):
audit_info = self.set_mocked_audit_info()
workspaces = WorkSpaces(audit_info)
assert workspaces.session.__class__.__name__ == "Session"
# Test WorkSpaces describe workspaces
def test__describe_workspaces__(self):
audit_info = self.set_mocked_audit_info()
workspaces = WorkSpaces(audit_info)
assert len(workspaces.workspaces) == 1
assert workspaces.workspaces[0].id == workspace_id
assert workspaces.workspaces[0].region == AWS_REGION
assert workspaces.workspaces[0].user_volume_encryption_enabled
assert workspaces.workspaces[0].root_volume_encryption_enabled

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "workspaces_volume_encryption_enabled",
"CheckTitle": "Ensure that your Amazon WorkSpaces storage volumes are encrypted in order to meet security and compliance requirements",
"CheckType": [],
"ServiceName": "workspaces",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:workspaces:region:account-id:workspace",
"Severity": "high",
"ResourceType": "AwsWorkspaces",
"Description": "Ensure that your Amazon WorkSpaces storage volumes are encrypted in order to meet security and compliance requirements",
"Risk": "If the value listed in the Volume Encryption column is Disabled the selected AWS WorkSpaces instance volumes (root and user volumes) are not encrypted. Therefore your data-at-rest is not protected from unauthorized access and does not meet the compliance requirements regarding data encryption.",
"RelatedUrl": "https://docs.aws.amazon.com/workspaces/latest/adminguide/encrypt-workspaces.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "https://docs.bridgecrew.io/docs/ensure-that-workspace-root-volumes-are-encrypted#cloudformation",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/WorkSpaces/storage-encryption.html",
"Terraform": "https://docs.bridgecrew.io/docs/ensure-that-workspace-root-volumes-are-encrypted#terraform"
},
"Recommendation": {
"Text": "WorkSpaces is integrated with the AWS Key Management Service (AWS KMS). This enables you to encrypt storage volumes of WorkSpaces using AWS KMS Key. When you launch a WorkSpace you can encrypt the root volume (for Microsoft Windows - the C drive; for Linux - /) and the user volume (for Windows - the D drive; for Linux - /home). Doing so ensures that the data stored at rest - disk I/O to the volume - and snapshots created from the volumes are all encrypted",
"Url": "https://docs.aws.amazon.com/workspaces/latest/adminguide/encrypt-workspaces.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,33 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.workspaces.workspaces_client import workspaces_client
class workspaces_volume_encryption_enabled(Check):
def execute(self):
findings = []
for workspace in workspaces_client.workspaces:
report = Check_Report(self.metadata)
report.region = workspace.region
report.resource_id = workspace.id
report.resource_arn = workspace.arn
report.status = "PASS"
report.status_extended = f"WorkSpaces workspace {workspace.id} without root or user unencrypted volumes"
if not workspace.user_volume_encryption_enabled:
report.status = "FAIL"
report.status_extended = (
f"WorkSpaces workspace {workspace.id} with user unencrypted volumes"
)
if not workspace.root_volume_encryption_enabled:
report.status = "FAIL"
report.status_extended = (
f"WorkSpaces workspace {workspace.id} with root unencrypted volumes"
)
if (
not workspace.root_volume_encryption_enabled
and not workspace.user_volume_encryption_enabled
):
report.status = "FAIL"
report.status_extended = f"WorkSpaces workspace {workspace.id} with root and user unencrypted volumes"
findings.append(report)
return findings

View File

@@ -0,0 +1,139 @@
from re import search
from unittest import mock
from uuid import uuid4
from providers.aws.services.workspaces.workspaces_service import WorkSpace
AWS_REGION = "eu-west-1"
AWS_ACCOUNT_NUMBER = "123456789012"
workspace_id = str(uuid4())
class Test_workspaces_volume_encryption_enabled:
def test_no_workspaces(self):
workspaces_client = mock.MagicMock
workspaces_client.workspaces = []
with mock.patch(
"providers.aws.services.workspaces.workspaces_service.WorkSpaces",
workspaces_client,
):
from providers.aws.services.workspaces.workspaces_volume_encryption_enabled.workspaces_volume_encryption_enabled import (
workspaces_volume_encryption_enabled,
)
check = workspaces_volume_encryption_enabled()
result = check.execute()
assert len(result) == 0
def test_workspaces_encrypted(self):
workspaces_client = mock.MagicMock
workspaces_client.workspaces = []
workspaces_client.workspaces.append(
WorkSpace(
id=workspace_id,
region=AWS_REGION,
user_volume_encryption_enabled=True,
root_volume_encryption_enabled=True,
)
)
with mock.patch(
"providers.aws.services.workspaces.workspaces_service.WorkSpaces",
workspaces_client,
):
from providers.aws.services.workspaces.workspaces_volume_encryption_enabled.workspaces_volume_encryption_enabled import (
workspaces_volume_encryption_enabled,
)
check = workspaces_volume_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"without root or user unencrypted volumes", result[0].status_extended
)
assert result[0].resource_id == workspace_id
assert result[0].resource_arn == ""
def test_workspaces_user_not_encrypted(self):
workspaces_client = mock.MagicMock
workspaces_client.workspaces = []
workspaces_client.workspaces.append(
WorkSpace(
id=workspace_id,
region=AWS_REGION,
user_volume_encryption_enabled=False,
root_volume_encryption_enabled=True,
)
)
with mock.patch(
"providers.aws.services.workspaces.workspaces_service.WorkSpaces",
workspaces_client,
):
from providers.aws.services.workspaces.workspaces_volume_encryption_enabled.workspaces_volume_encryption_enabled import (
workspaces_volume_encryption_enabled,
)
check = workspaces_volume_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("user unencrypted volumes", result[0].status_extended)
assert result[0].resource_id == workspace_id
assert result[0].resource_arn == ""
def test_workspaces_root_not_encrypted(self):
workspaces_client = mock.MagicMock
workspaces_client.workspaces = []
workspaces_client.workspaces.append(
WorkSpace(
id=workspace_id,
region=AWS_REGION,
user_volume_encryption_enabled=True,
root_volume_encryption_enabled=False,
)
)
with mock.patch(
"providers.aws.services.workspaces.workspaces_service.WorkSpaces",
workspaces_client,
):
from providers.aws.services.workspaces.workspaces_volume_encryption_enabled.workspaces_volume_encryption_enabled import (
workspaces_volume_encryption_enabled,
)
check = workspaces_volume_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("root unencrypted volumes", result[0].status_extended)
assert result[0].resource_id == workspace_id
assert result[0].resource_arn == ""
def test_workspaces_user_and_root_not_encrypted(self):
workspaces_client = mock.MagicMock
workspaces_client.workspaces = []
workspaces_client.workspaces.append(
WorkSpace(
id=workspace_id,
region=AWS_REGION,
user_volume_encryption_enabled=False,
root_volume_encryption_enabled=False,
)
)
with mock.patch(
"providers.aws.services.workspaces.workspaces_service.WorkSpaces",
workspaces_client,
):
from providers.aws.services.workspaces.workspaces_volume_encryption_enabled.workspaces_volume_encryption_enabled import (
workspaces_volume_encryption_enabled,
)
check = workspaces_volume_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"with root and user unencrypted volumes", result[0].status_extended
)
assert result[0].resource_id == workspace_id
assert result[0].resource_arn == ""