feat(azure): new checks related with VMs service. (#3408)

Co-authored-by: Sergio Garcia <sergargar1@gmail.com>
This commit is contained in:
Rubén De la Torre Vico
2024-02-20 16:34:34 +01:00
committed by GitHub
parent 9a22c2de8b
commit da1f266d1b
22 changed files with 1106 additions and 2 deletions

20
poetry.lock generated
View File

@@ -358,6 +358,22 @@ azure-common = ">=1.1,<2.0"
azure-mgmt-core = ">=1.3.2,<2.0.0"
isodate = ">=0.6.1,<1.0.0"
[[package]]
name = "azure-mgmt-compute"
version = "30.5.0"
description = "Microsoft Azure Compute Management Client Library for Python"
optional = false
python-versions = ">=3.7"
files = [
{file = "azure-mgmt-compute-30.5.0.tar.gz", hash = "sha256:ed3ea34b799db0d52ee55e2f1ab4b0f09fa4a08f35e061ecb9aad9fb5a218844"},
{file = "azure_mgmt_compute-30.5.0-py3-none-any.whl", hash = "sha256:b65a6c1e22be7334604257d8d9f96a9c6dc4c6d4869f95d0d551c7c8170a2e71"},
]
[package.dependencies]
azure-common = ">=1.1,<2.0"
azure-mgmt-core = ">=1.3.2,<2.0.0"
isodate = ">=0.6.1,<1.0.0"
[[package]]
name = "azure-mgmt-core"
version = "1.4.0"
@@ -2947,8 +2963,8 @@ astroid = ">=3.0.1,<=3.1.0-dev0"
colorama = {version = ">=0.4.5", markers = "sys_platform == \"win32\""}
dill = [
{version = ">=0.2", markers = "python_version < \"3.11\""},
{version = ">=0.3.6", markers = "python_version >= \"3.11\""},
{version = ">=0.3.7", markers = "python_version >= \"3.12\""},
{version = ">=0.3.6", markers = "python_version >= \"3.11\" and python_version < \"3.12\""},
]
isort = ">=4.2.5,<5.13.0 || >5.13.0,<6"
mccabe = ">=0.6,<0.8"
@@ -4235,4 +4251,4 @@ docs = ["mkdocs", "mkdocs-material"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.9,<3.13"
content-hash = "cb76dd1a0fafbfa685bca46d1f214b7fb5158fa76bbcec46159a1b68add29456"
content-hash = "a73ada50dafeb373f791cf7b138519863b907c0f9198fa45809d7fb53c934de9"

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "defender_assessments_vm_endpoint_protection_installed",
"CheckTitle": "Ensure that Endpoint Protection for all Virtual Machines is installed",
"CheckType": [],
"ServiceName": "defender",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Microsoft.Security/assessments",
"Description": "Install endpoint protection for all virtual machines.",
"Risk": "Installing endpoint protection systems (like anti-malware for Azure) provides for real-time protection capability that helps identify and remove viruses, spyware, and other malicious software. These also offer configurable alerts when known-malicious or unwanted software attempts to install itself or run on Azure systems.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/security/fundamentals/antimalware",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/azure/VirtualMachines/install-endpoint-protection.html#",
"Terraform": ""
},
"Recommendation": {
"Text": "Follow Microsoft Azure documentation to install endpoint protection from the security center. Alternatively, you can employ your own endpoint protection tool for your OS.",
"Url": ""
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Endpoint protection will incur an additional cost to you."
}

View File

@@ -0,0 +1,41 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.defender.defender_client import defender_client
class defender_assessments_vm_endpoint_protection_installed(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for (
subscription_name,
assessments,
) in defender_client.assessments.items():
pass
if (
"Install endpoint protection solution on virtual machines"
in assessments
):
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = assessments[
"Install endpoint protection solution on virtual machines"
].resource_name
report.resource_id = assessments[
"Install endpoint protection solution on virtual machines"
].resource_id
report.status_extended = f"Endpoint protection is set up in all VMs in subscription {subscription_name}."
if (
assessments[
"Install endpoint protection solution on virtual machines"
].status
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"Endpoint protection is not set up in all VMs in subscription {subscription_name}."
findings.append(report)
return findings

View File

@@ -0,0 +1,4 @@
from prowler.providers.azure.lib.audit_info.audit_info import azure_audit_info
from prowler.providers.azure.services.vm.vm_service import VirtualMachines
vm_client = VirtualMachines(azure_audit_info)

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "vm_ensure_attached_disks_encrypted_with_cmk",
"CheckTitle": "Ensure that 'OS and Data' disks are encrypted with Customer Managed Key (CMK)",
"CheckType": [],
"ServiceName": "vm",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Microsoft.Compute/virtualMachines",
"Description": "Ensure that OS disks (boot volumes) and data disks (non-boot volumes) are encrypted with CMK (Customer Managed Keys). Customer Managed keys can be either ADE or Server Side Encryption (SSE).",
"Risk": "Encrypting the IaaS VM's OS disk (boot volume) and Data disks (non-boot volume) ensures that the entire content is fully unrecoverable without a key, thus protecting the volume from unwanted reads. PMK (Platform Managed Keys) are enabled by default in Azure-managed disks and allow encryption at rest. CMK is recommended because it gives the customer the option to control which specific keys are used for the encryption and decryption of the disk. The customer can then change keys and increase security by disabling them instead of relying on the PMK key that remains unchanging. There is also the option to increase security further by using automatically rotating keys so that access to disk is ensured to be limited. Organizations should evaluate what their security requirements are, however, for the data stored on the disk. For high-risk data using CMK is a must, as it provides extra steps of security. If the data is low risk, PMK is enabled by default and provides sufficient data security.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/virtual-machines/disk-encryption-overview",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/azure/VirtualMachines/sse-boot-disk-cmk.html#",
"Terraform": "https://docs.bridgecrew.io/docs/bc_azr_general_1#terraform"
},
"Recommendation": {
"Text": "Note: Disks must be detached from VMs to have encryption changed. 1. Go to Virtual machines 2. For each virtual machine, go to Settings 3. Click on Disks 4. Click the ellipsis (...), then click Detach to detach the disk from the VM 5. Now search for Disks and locate the unattached disk 6. Click the disk then select Encryption 7. Change your encryption type, then select your encryption set 8. Click Save 9. Go back to the VM and re-attach the disk",
"Url": "https://learn.microsoft.com/en-us/azure/security/fundamentals/data-encryption-best-practices#protect-data-at-rest"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Using CMK/BYOK will entail additional management of keys."
}

View File

@@ -0,0 +1,28 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.vm.vm_client import vm_client
class vm_ensure_attached_disks_encrypted_with_cmk(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, disks in vm_client.disks.items():
for disk_id, disk in disks.items():
if disk.vms_attached:
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = disk.resource_name
report.resource_id = disk.resource_id
report.status_extended = f"Disk '{disk_id}' is encrypted with a customer-managed key in subscription {subscription_name}."
if (
not disk.encryption_type
or disk.encryption_type == "EncryptionAtRestWithPlatformKey"
):
report.status = "FAIL"
report.status_extended = f"Disk '{disk_id}' is not encrypted with a customer-managed key in subscription {subscription_name}."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "vm_ensure_unattached_disks_encrypted_with_cmk",
"CheckTitle": "Ensure that 'Unattached disks' are encrypted with 'Customer Managed Key' (CMK)",
"CheckType": [],
"ServiceName": "vm",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Microsoft.Compute/virtualMachines",
"Description": "Ensure that unattached disks in a subscription are encrypted with a Customer Managed Key (CMK).",
"Risk": "Managed disks are encrypted by default with Platform-managed keys. Using Customer-managed keys may provide an additional level of security or meet an organization's regulatory requirements. Encrypting managed disks ensures that its entire content is fully unrecoverable without a key and thus protects the volume from unwarranted reads. Even if the disk is not attached to any of the VMs, there is always a risk where a compromised user account with administrative access to VM service can mount/attach these data disks, which may lead to sensitive information disclosure and tampering.",
"RelatedUrl": "https://docs.microsoft.com/en-us/azure/security/fundamentals/azure-disk-encryption-vms-vmss",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/azure/VirtualMachines/sse-unattached-disk-cmk.html#",
"Terraform": ""
},
"Recommendation": {
"Text": "If data stored in the disk is no longer useful, refer to Azure documentation to delete unattached data disks at: https://learn.microsoft.com/en-us/rest/api/compute/disks/delete?view=rest-compute-2023-10-02&tabs=HTTP",
"Url": "https://learn.microsoft.com/en-us/azure/security/fundamentals/data-encryption-best-practices#protect-data-at-rest"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "You must have your key vault set up to utilize this. Encryption is available only on Standard tier VMs. This might cost you more. Utilizing and maintaining Customer-managed keys will require additional work to create, protect, and rotate keys."
}

View File

@@ -0,0 +1,28 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.vm.vm_client import vm_client
class vm_ensure_unattached_disks_encrypted_with_cmk(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, disks in vm_client.disks.items():
for disk_id, disk in disks.items():
if not disk.vms_attached:
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = disk.resource_name
report.resource_id = disk.resource_id
report.status_extended = f"Disk '{disk_id}' is encrypted with a customer-managed key in subscription {subscription_name}."
if (
not disk.encryption_type
or disk.encryption_type == "EncryptionAtRestWithPlatformKey"
):
report.status = "FAIL"
report.status_extended = f"Disk '{disk_id}' is not encrypted with a customer-managed key in subscription {subscription_name}."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "vm_ensure_using_managed_disks",
"CheckTitle": "Ensure Virtual Machines are utilizing Managed Disks",
"CheckType": [],
"ServiceName": "vm",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Microsoft.Compute/virtualMachines",
"Description": "Migrate blob-based VHDs to Managed Disks on Virtual Machines to exploit the default features of this configuration. The features include: 1. Default Disk Encryption 2. Resilience, as Microsoft will managed the disk storage and move around if underlying hardware goes faulty 3. Reduction of costs over storage accounts",
"Risk": "Managed disks are by default encrypted on the underlying hardware, so no additional encryption is required for basic protection. It is available if additional encryption is required. Managed disks are by design more resilient that storage accounts. For ARM-deployed Virtual Machines, Azure Adviser will at some point recommend moving VHDs to managed disks both from a security and cost management perspective.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/virtual-machines/unmanaged-disks-deprecation",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/azure/VirtualMachines/managed-disks-in-use.html",
"Terraform": "https://docs.bridgecrew.io/docs/ensure-virtual-machines-are-utilizing-managed-disks#terraform"
},
"Recommendation": {
"Text": "There are additional costs for managed disks based off of disk space allocated. When converting to managed disks, VMs will be powered off and back on.",
"Url": "https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-data-protection#dp-4-enable-data-at-rest-encryption-by-default"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "There are additional costs for managed disks based off of disk space allocated. When converting to managed disks, VMs will be powered off and back on."
}

View File

@@ -0,0 +1,40 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.vm.vm_client import vm_client
class vm_ensure_using_managed_disks(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, vms in vm_client.virtual_machines.items():
for vm_id, vm in vms.items():
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = vm.resource_name
report.resource_id = vm_id
report.status_extended = f"VM {vm.resource_name} is using managed disks in subscription {subscription_name}"
using_managed_disks = (
True
if vm.storage_profile
and getattr(vm.storage_profile, "os_disk", False)
and getattr(vm.storage_profile.os_disk, "managed_disk", False)
else False
)
if using_managed_disks and getattr(
vm.storage_profile, "data_disks", False
):
for data_disk in vm.storage_profile.data_disks:
if not getattr(data_disk, "managed_disk", False):
using_managed_disks = False
break
if not using_managed_disks:
report.status = "FAIL"
report.status_extended = f"VM {vm.resource_name} is not using managed disks in subscription {subscription_name}"
findings.append(report)
return findings

View File

@@ -0,0 +1,91 @@
from dataclasses import dataclass
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.compute.models import StorageProfile
from prowler.lib.logger import logger
from prowler.providers.azure.lib.audit_info.models import Azure_Audit_Info
from prowler.providers.azure.lib.service.service import AzureService
########################## VirtualMachines
class VirtualMachines(AzureService):
def __init__(self, audit_info: Azure_Audit_Info):
super().__init__(ComputeManagementClient, audit_info)
self.virtual_machines = self.__get_virtual_machines__()
self.disks = self.__get_disks__()
def __get_virtual_machines__(self):
logger.info("VirtualMachines - Getting virtual machines...")
virtual_machines = {}
for subscription_name, client in self.clients.items():
try:
virtual_machines_list = client.virtual_machines.list_all()
virtual_machines.update({subscription_name: {}})
for vm in virtual_machines_list:
virtual_machines[subscription_name].update(
{
vm.vm_id: VirtualMachine(
resource_id=vm.id,
resource_name=vm.name,
storage_profile=getattr(vm, "storage_profile", None),
)
}
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return virtual_machines
def __get_disks__(self):
logger.info("VirtualMachines - Getting disks...")
disks = {}
for subscription_name, client in self.clients.items():
try:
disks_list = client.disks.list()
disks.update({subscription_name: {}})
for disk in disks_list:
vms_attached = []
if disk.managed_by:
vms_attached.append(disk.managed_by)
if disk.managed_by_extended:
vms_attached.extend(disk.managed_by_extended)
disks[subscription_name].update(
{
disk.unique_id: Disk(
resource_id=disk.id,
resource_name=disk.name,
vms_attached=vms_attached,
encryption_type=getattr(
getattr(disk, "encryption", None), "type", None
),
)
}
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return disks
@dataclass
class VirtualMachine:
resource_id: str
resource_name: str
storage_profile: StorageProfile
@dataclass
class Disk:
resource_id: str
resource_name: str
vms_attached: list[str]
encryption_type: str

View File

@@ -30,6 +30,7 @@ awsipranges = "0.3.3"
azure-identity = "1.15.0"
azure-mgmt-applicationinsights = "4.0.0"
azure-mgmt-authorization = "4.0.0"
azure-mgmt-compute = "30.5.0"
azure-mgmt-rdbms = "10.1.0"
azure-mgmt-cosmosdb = "9.4.0"
azure-mgmt-network = "25.2.0"

View File

@@ -0,0 +1,103 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.defender.defender_service import Assesment
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_defender_assessments_vm_endpoint_protection_installed:
def test_defender_no_subscriptions(self):
defender_client = mock.MagicMock
defender_client.assessments = {}
with mock.patch(
"prowler.providers.azure.services.defender.defender_assessments_vm_endpoint_protection_installed.defender_assessments_vm_endpoint_protection_installed.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_assessments_vm_endpoint_protection_installed.defender_assessments_vm_endpoint_protection_installed import (
defender_assessments_vm_endpoint_protection_installed,
)
check = defender_assessments_vm_endpoint_protection_installed()
result = check.execute()
assert len(result) == 0
def test_defender_subscriptions_with_no_assessments(self):
defender_client = mock.MagicMock
defender_client.assessments = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.defender.defender_assessments_vm_endpoint_protection_installed.defender_assessments_vm_endpoint_protection_installed.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_assessments_vm_endpoint_protection_installed.defender_assessments_vm_endpoint_protection_installed import (
defender_assessments_vm_endpoint_protection_installed,
)
check = defender_assessments_vm_endpoint_protection_installed()
result = check.execute()
assert len(result) == 0
def test_defender_subscriptions_with_healthy_assessments(self):
defender_client = mock.MagicMock
resource_id = str(uuid4())
defender_client.assessments = {
AZURE_SUBSCRIPTION: {
"Install endpoint protection solution on virtual machines": Assesment(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_assessments_vm_endpoint_protection_installed.defender_assessments_vm_endpoint_protection_installed.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_assessments_vm_endpoint_protection_installed.defender_assessments_vm_endpoint_protection_installed import (
defender_assessments_vm_endpoint_protection_installed,
)
check = defender_assessments_vm_endpoint_protection_installed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Endpoint protection is set up in all VMs in subscription {AZURE_SUBSCRIPTION}."
)
assert result[0].resource_name == "vm1"
assert result[0].resource_id == resource_id
def test_defender_subscriptions_with_unhealthy_assessments(self):
defender_client = mock.MagicMock
resource_id = str(uuid4())
defender_client.assessments = {
AZURE_SUBSCRIPTION: {
"Install endpoint protection solution on virtual machines": Assesment(
resource_id=resource_id,
resource_name="vm1",
status="Unhealthy",
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_assessments_vm_endpoint_protection_installed.defender_assessments_vm_endpoint_protection_installed.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_assessments_vm_endpoint_protection_installed.defender_assessments_vm_endpoint_protection_installed import (
defender_assessments_vm_endpoint_protection_installed,
)
check = defender_assessments_vm_endpoint_protection_installed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Endpoint protection is not set up in all VMs in subscription {AZURE_SUBSCRIPTION}."
)
assert result[0].resource_name == "vm1"
assert result[0].resource_id == resource_id

View File

@@ -0,0 +1,186 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.vm.vm_service import Disk
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_vm_ensure_attached_disks_encrypted_with_cmk:
def test_vm_no_subscriptions(self):
vm_client = mock.MagicMock
vm_client.disks = {}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk import (
vm_ensure_attached_disks_encrypted_with_cmk,
)
check = vm_ensure_attached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 0
def test_vm_subscription_empty(self):
vm_client = mock.MagicMock
vm_client.disks = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk import (
vm_ensure_attached_disks_encrypted_with_cmk,
)
check = vm_ensure_attached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 0
def test_vm_subscription_one_disk_attached_encrypt_pk(self):
disk_id = uuid4()
resource_id = uuid4()
vm_client = mock.MagicMock
vm_client.disks = {
AZURE_SUBSCRIPTION: {
disk_id: Disk(
resource_id=resource_id,
resource_name="test-disk",
vms_attached=[uuid4()],
encryption_type="EncryptionAtRestWithPlatformKey",
)
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk import (
vm_ensure_attached_disks_encrypted_with_cmk,
)
check = vm_ensure_attached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 1
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].status == "FAIL"
assert result[0].resource_id == resource_id
assert result[0].resource_name == "test-disk"
assert (
result[0].status_extended
== f"Disk '{disk_id}' is not encrypted with a customer-managed key in subscription {AZURE_SUBSCRIPTION}."
)
def test_vm_subscription_one_disk_attached_encrypt_cmk(self):
disk_id = uuid4()
resource_id = uuid4()
vm_client = mock.MagicMock
vm_client.disks = {
AZURE_SUBSCRIPTION: {
disk_id: Disk(
resource_id=resource_id,
resource_name="test-disk",
vms_attached=[uuid4()],
encryption_type="EncryptionAtRestWithCustomerKey",
)
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk import (
vm_ensure_attached_disks_encrypted_with_cmk,
)
check = vm_ensure_attached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 1
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].status == "PASS"
assert result[0].resource_id == resource_id
assert result[0].resource_name == "test-disk"
assert (
result[0].status_extended
== f"Disk '{disk_id}' is encrypted with a customer-managed key in subscription {AZURE_SUBSCRIPTION}."
)
def test_vm_subscription_two_disk_attached_encrypt_cmk_and_pk(self):
disk_id_1 = uuid4()
resource_id_1 = uuid4()
disk_id_2 = uuid4()
resource_id_2 = uuid4()
vm_client = mock.MagicMock
vm_client.disks = {
AZURE_SUBSCRIPTION: {
disk_id_1: Disk(
resource_id=resource_id_1,
resource_name="test-disk",
vms_attached=[uuid4()],
encryption_type="EncryptionAtRestWithPlatformKey",
),
disk_id_2: Disk(
resource_id=resource_id_2,
resource_name="test-disk-2",
vms_attached=[uuid4(), uuid4()],
encryption_type="EncryptionAtRestWithCustomerKey",
),
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk import (
vm_ensure_attached_disks_encrypted_with_cmk,
)
check = vm_ensure_attached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 2
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].status == "FAIL"
assert result[0].resource_id == resource_id_1
assert result[0].resource_name == "test-disk"
assert (
result[0].status_extended
== f"Disk '{disk_id_1}' is not encrypted with a customer-managed key in subscription {AZURE_SUBSCRIPTION}."
)
assert result[1].status == "PASS"
assert result[1].resource_id == resource_id_2
assert result[1].resource_name == "test-disk-2"
assert (
result[1].status_extended
== f"Disk '{disk_id_2}' is encrypted with a customer-managed key in subscription {AZURE_SUBSCRIPTION}."
)
def test_vm_unattached_disk_encrypt_cmk(self):
disk_id = uuid4()
resource_id = uuid4()
vm_client = mock.MagicMock
vm_client.disks = {
AZURE_SUBSCRIPTION: {
disk_id: Disk(
resource_id=resource_id,
resource_name="test-disk",
vms_attached=[],
encryption_type="EncryptionAtRestWithCustomerKey",
)
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_attached_disks_encrypted_with_cmk.vm_ensure_attached_disks_encrypted_with_cmk import (
vm_ensure_attached_disks_encrypted_with_cmk,
)
check = vm_ensure_attached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,186 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.vm.vm_service import Disk
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_vm_ensure_unattached_disks_encrypted_with_cmk:
def test_vm_no_subscriptions(self):
vm_client = mock.MagicMock
vm_client.disks = {}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk import (
vm_ensure_unattached_disks_encrypted_with_cmk,
)
check = vm_ensure_unattached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 0
def test_vm_subscription_empty(self):
vm_client = mock.MagicMock
vm_client.disks = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk import (
vm_ensure_unattached_disks_encrypted_with_cmk,
)
check = vm_ensure_unattached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 0
def test_vm_one_unattached_disk_encrypt_pk(self):
disk_id = uuid4()
resource_id = uuid4()
vm_client = mock.MagicMock
vm_client.disks = {
AZURE_SUBSCRIPTION: {
disk_id: Disk(
resource_id=resource_id,
resource_name="test-disk",
vms_attached=[],
encryption_type="EncryptionAtRestWithPlatformKey",
)
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk import (
vm_ensure_unattached_disks_encrypted_with_cmk,
)
check = vm_ensure_unattached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 1
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].status == "FAIL"
assert result[0].resource_id == resource_id
assert result[0].resource_name == "test-disk"
assert (
result[0].status_extended
== f"Disk '{disk_id}' is not encrypted with a customer-managed key in subscription {AZURE_SUBSCRIPTION}."
)
def test_vm_one_unattached_disk_encrypt_cmk(self):
disk_id = uuid4()
resource_id = uuid4()
vm_client = mock.MagicMock
vm_client.disks = {
AZURE_SUBSCRIPTION: {
disk_id: Disk(
resource_id=resource_id,
resource_name="test-disk",
vms_attached=[],
encryption_type="EncryptionAtRestWithCustomerKey",
)
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk import (
vm_ensure_unattached_disks_encrypted_with_cmk,
)
check = vm_ensure_unattached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 1
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].status == "PASS"
assert result[0].resource_id == resource_id
assert result[0].resource_name == "test-disk"
assert (
result[0].status_extended
== f"Disk '{disk_id}' is encrypted with a customer-managed key in subscription {AZURE_SUBSCRIPTION}."
)
def test_vm_subscription_two_unattached_disk_encrypt_cmk_and_pk(self):
disk_id_1 = uuid4()
resource_id_1 = uuid4()
disk_id_2 = uuid4()
resource_id_2 = uuid4()
vm_client = mock.MagicMock
vm_client.disks = {
AZURE_SUBSCRIPTION: {
disk_id_1: Disk(
resource_id=resource_id_1,
resource_name="test-disk",
vms_attached=[],
encryption_type="EncryptionAtRestWithPlatformKey",
),
disk_id_2: Disk(
resource_id=resource_id_2,
resource_name="test-disk-2",
vms_attached=[],
encryption_type="EncryptionAtRestWithCustomerKey",
),
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk import (
vm_ensure_unattached_disks_encrypted_with_cmk,
)
check = vm_ensure_unattached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 2
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].status == "FAIL"
assert result[0].resource_id == resource_id_1
assert result[0].resource_name == "test-disk"
assert (
result[0].status_extended
== f"Disk '{disk_id_1}' is not encrypted with a customer-managed key in subscription {AZURE_SUBSCRIPTION}."
)
assert result[1].status == "PASS"
assert result[1].resource_id == resource_id_2
assert result[1].resource_name == "test-disk-2"
assert (
result[1].status_extended
== f"Disk '{disk_id_2}' is encrypted with a customer-managed key in subscription {AZURE_SUBSCRIPTION}."
)
def test_vm_attached_disk_encrypt_cmk(self):
disk_id = uuid4()
resource_id = uuid4()
vm_client = mock.MagicMock
vm_client.disks = {
AZURE_SUBSCRIPTION: {
disk_id: Disk(
resource_id=resource_id,
resource_name="test-disk",
vms_attached=[uuid4()],
encryption_type="EncryptionAtRestWithCustomerKey",
)
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_unattached_disks_encrypted_with_cmk.vm_ensure_unattached_disks_encrypted_with_cmk import (
vm_ensure_unattached_disks_encrypted_with_cmk,
)
check = vm_ensure_unattached_disks_encrypted_with_cmk()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,156 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.vm.vm_service import VirtualMachine
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_vm_ensure_using_managed_disks:
def test_vm_no_subscriptions(self):
vm_client = mock.MagicMock
vm_client.virtual_machines = {}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_using_managed_disks.vm_ensure_using_managed_disks.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_using_managed_disks.vm_ensure_using_managed_disks import (
vm_ensure_using_managed_disks,
)
check = vm_ensure_using_managed_disks()
result = check.execute()
assert len(result) == 0
def test_vm_subscriptions(self):
vm_client = mock.MagicMock
vm_client.virtual_machines = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_using_managed_disks.vm_ensure_using_managed_disks.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_using_managed_disks.vm_ensure_using_managed_disks import (
vm_ensure_using_managed_disks,
)
check = vm_ensure_using_managed_disks()
result = check.execute()
assert len(result) == 0
def test_vm_ensure_using_managed_disks(self):
vm_id = str(uuid4())
vm_client = mock.MagicMock
vm_client.virtual_machines = {
AZURE_SUBSCRIPTION: {
vm_id: VirtualMachine(
resource_id="/subscriptions/resource_id",
resource_name="VMTest",
storage_profile=mock.MagicMock(
os_disk=mock.MagicMock(
create_option="FromImage",
managed_disk=mock.MagicMock(id="managed_disk_id"),
),
data_disks=[],
),
)
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_using_managed_disks.vm_ensure_using_managed_disks.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_using_managed_disks.vm_ensure_using_managed_disks import (
vm_ensure_using_managed_disks,
)
check = vm_ensure_using_managed_disks()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == "VMTest"
assert result[0].resource_id == vm_id
assert (
result[0].status_extended
== f"VM VMTest is using managed disks in subscription {AZURE_SUBSCRIPTION}"
)
def test_vm_using_not_managed_os_disk(self):
vm_id = str(uuid4())
vm_client = mock.MagicMock
vm_client.virtual_machines = {
AZURE_SUBSCRIPTION: {
vm_id: VirtualMachine(
resource_id="/subscriptions/resource_id",
resource_name="VMTest",
storage_profile=mock.MagicMock(
os_disk=mock.MagicMock(
create_option="FromImage",
managed_disk=None,
),
data_disks=[],
),
)
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_using_managed_disks.vm_ensure_using_managed_disks.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_using_managed_disks.vm_ensure_using_managed_disks import (
vm_ensure_using_managed_disks,
)
check = vm_ensure_using_managed_disks()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == "VMTest"
assert result[0].resource_id == vm_id
assert (
result[0].status_extended
== f"VM VMTest is not using managed disks in subscription {AZURE_SUBSCRIPTION}"
)
def test_vm_using_not_managed_data_disks(self):
vm_id = str(uuid4())
vm_client = mock.MagicMock
vm_client.virtual_machines = {
AZURE_SUBSCRIPTION: {
vm_id: VirtualMachine(
resource_id="/subscriptions/resource_id",
resource_name="VMTest",
storage_profile=mock.MagicMock(
os_disk=mock.MagicMock(
create_option="FromImage",
managed_disk=mock.MagicMock(id="managed_disk_id"),
),
data_disks=[mock.MagicMock(managed_disk=None)],
),
)
}
}
with mock.patch(
"prowler.providers.azure.services.vm.vm_ensure_using_managed_disks.vm_ensure_using_managed_disks.vm_client",
new=vm_client,
):
from prowler.providers.azure.services.vm.vm_ensure_using_managed_disks.vm_ensure_using_managed_disks import (
vm_ensure_using_managed_disks,
)
check = vm_ensure_using_managed_disks()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == "VMTest"
assert result[0].resource_id == vm_id
assert (
result[0].status_extended
== f"VM VMTest is not using managed disks in subscription {AZURE_SUBSCRIPTION}"
)

View File

@@ -0,0 +1,104 @@
from unittest.mock import patch
from azure.mgmt.compute.models import ManagedDiskParameters, OSDisk, StorageProfile
from prowler.providers.azure.services.vm.vm_service import (
Disk,
VirtualMachine,
VirtualMachines,
)
from tests.providers.azure.azure_fixtures import (
AZURE_SUBSCRIPTION,
set_mocked_azure_audit_info,
)
def mock_vm_get_virtual_machines(_):
return {
AZURE_SUBSCRIPTION: {
"vm_id-1": VirtualMachine(
resource_id="/subscriptions/resource_id",
resource_name="VMTest",
storage_profile=StorageProfile(
os_disk=OSDisk(
create_option="FromImage",
managed_disk=ManagedDiskParameters(id="managed_disk_id"),
),
data_disks=[],
),
)
}
}
def mock_vm_get_disks(_):
return {
AZURE_SUBSCRIPTION: {
"disk_id-1": Disk(
resource_id="disk_id-1",
resource_name="DiskTest",
vms_attached=["managed_by"],
encryption_type="EncryptionAtRestWithPlatformKey",
)
}
}
@patch(
"prowler.providers.azure.services.vm.vm_service.VirtualMachines.__get_virtual_machines__",
new=mock_vm_get_virtual_machines,
)
@patch(
"prowler.providers.azure.services.vm.vm_service.VirtualMachines.__get_disks__",
new=mock_vm_get_disks,
)
class Test_AppInsights_Service:
def test__get_client__(self):
app_insights = VirtualMachines(set_mocked_azure_audit_info())
assert (
app_insights.clients[AZURE_SUBSCRIPTION].__class__.__name__
== "ComputeManagementClient"
)
def test__get_subscriptions__(self):
app_insights = VirtualMachines(set_mocked_azure_audit_info())
assert app_insights.subscriptions.__class__.__name__ == "dict"
def test__get_virtual_machines(self):
virtual_machines = VirtualMachines(set_mocked_azure_audit_info())
assert len(virtual_machines.virtual_machines) == 1
assert (
virtual_machines.virtual_machines[AZURE_SUBSCRIPTION]["vm_id-1"].resource_id
== "/subscriptions/resource_id"
)
assert (
virtual_machines.virtual_machines[AZURE_SUBSCRIPTION][
"vm_id-1"
].resource_name
== "VMTest"
)
assert (
virtual_machines.virtual_machines[AZURE_SUBSCRIPTION][
"vm_id-1"
].storage_profile.os_disk.managed_disk.id
== "managed_disk_id"
)
assert (
len(
virtual_machines.virtual_machines[AZURE_SUBSCRIPTION][
"vm_id-1"
].storage_profile.data_disks
)
== 0
)
def test__get_disks(self):
disks = VirtualMachines(set_mocked_azure_audit_info()).disks
assert len(disks) == 1
assert disks[AZURE_SUBSCRIPTION]["disk_id-1"].resource_id == "disk_id-1"
assert disks[AZURE_SUBSCRIPTION]["disk_id-1"].resource_name == "DiskTest"
assert disks[AZURE_SUBSCRIPTION]["disk_id-1"].vms_attached == ["managed_by"]
assert (
disks[AZURE_SUBSCRIPTION]["disk_id-1"].encryption_type
== "EncryptionAtRestWithPlatformKey"
)