feat(azure): Add check defender_ensure_system_updates_are_applied and defender_auto_provisioning_vulnerabilty_assessments_machines_on (#3327)

This commit is contained in:
Rubén De la Torre Vico
2024-01-31 12:29:45 +01:00
committed by GitHub
parent 48587bd034
commit 622bce9c52
11 changed files with 650 additions and 8 deletions

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "defender_auto_provisioning_vulnerabilty_assessments_machines_on",
"CheckTitle": "Ensure that Auto provisioning of 'Vulnerability assessment for machines' is Set to 'On'",
"CheckType": [],
"ServiceName": "defender",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "AzureDefenderPlan",
"Description": "Enable automatic provisioning of vulnerability assessment for machines on both Azure and hybrid (Arc enabled) machines.",
"Risk": "Vulnerability assessment for machines scans for various security-related configurations and events such as system updates, OS vulnerabilities, and endpoint protection, then produces alerts on threat and vulnerability findings.",
"RelatedUrl": "https://docs.microsoft.com/en-us/azure/defender-for-cloud/enable-data-collection?tabs=autoprovision-va",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/azure/SecurityCenter/automatic-provisioning-vulnerability-assessment-machines.html",
"Terraform": ""
},
"Recommendation": {
"Text": "1. From Azure Home select the Portal Menu. 2. Select Microsoft Defender for Cloud. 3. Then Environment Settings. 4. Select a subscription. 5. Click on Settings & Monitoring. 6. Ensure that Vulnerability assessment for machines is set to On. Repeat this for any additional subscriptions.",
"Url": ""
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Additional licensing is required and configuration of Azure Arc introduces complexity beyond this recommendation."
}

View File

@@ -0,0 +1,39 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.defender.defender_client import defender_client
class defender_auto_provisioning_vulnerabilty_assessments_machines_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for (
subscription_name,
assessments,
) in defender_client.assessments.items():
if (
"Machines should have a vulnerability assessment solution"
in assessments
):
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = assessments[
"Machines should have a vulnerability assessment solution"
].resource_name
report.resource_id = assessments[
"Machines should have a vulnerability assessment solution"
].resource_id
report.status_extended = f"Vulnerability assessment is set up in all VMs in subscription {subscription_name}."
if (
assessments[
"Machines should have a vulnerability assessment solution"
].status
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"Vulnerability assessment is not set up in all VMs in subscription {subscription_name}."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "defender_ensure_system_updates_are_applied",
"CheckTitle": "Ensure that Microsoft Defender Recommendation for 'Apply system updates' status is 'Completed'",
"CheckType": [],
"ServiceName": "defender",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "AzureDefenderRecommendation",
"Description": "Ensure that the latest OS patches for all virtual machines are applied.",
"Risk": "The Azure Security Center retrieves a list of available security and critical updates from Windows Update or Windows Server Update Services (WSUS), depending on which service is configured on a Windows VM. The security center also checks for the latest updates in Linux systems. If a VM is missing a system update, the security center will recommend system updates be applied.",
"RelatedUrl": "https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-posture-vulnerability-management#pv-7-rapidly-and-automatically-remediate-software-vulnerabilities",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/azure/VirtualMachines/apply-latest-os-patches.html",
"Terraform": ""
},
"Recommendation": {
"Text": "Follow Microsoft Azure documentation to apply security patches from the security center. Alternatively, you can employ your own patch assessment and management tool to periodically assess, report, and install the required security patches for your OS.",
"Url": "https://learn.microsoft.com/en-us/azure/virtual-machines/updates-maintenance-overview"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Running Microsoft Defender for Cloud incurs additional charges for each resource monitored. Please see attached reference for exact charges per hour."
}

View File

@@ -0,0 +1,50 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.defender.defender_client import defender_client
class defender_ensure_system_updates_are_applied(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for (
subscription_name,
assessments,
) in defender_client.assessments.items():
if (
"Log Analytics agent should be installed on virtual machines"
in assessments
and "Machines should be configured to periodically check for missing system updates"
in assessments
and "System updates should be installed on your machines" in assessments
):
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = assessments[
"System updates should be installed on your machines"
].resource_name
report.resource_id = assessments[
"System updates should be installed on your machines"
].resource_id
report.status_extended = f"System updates are applied for all the VMs in the subscription {subscription_name}."
if (
assessments[
"Log Analytics agent should be installed on virtual machines"
].status
== "Unhealthy"
or assessments[
"Machines should be configured to periodically check for missing system updates"
].status
== "Unhealthy"
or assessments[
"System updates should be installed on your machines"
].status
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"System updates are not applied for all the VMs in the subscription {subscription_name}."
findings.append(report)
return findings

View File

@@ -14,16 +14,17 @@ class Defender(AzureService):
self.pricings = self.__get_pricings__()
self.auto_provisioning_settings = self.__get_auto_provisioning_settings__()
self.assessments = self.__get_assessments__()
def __get_pricings__(self):
logger.info("Defender - Getting pricings...")
pricings = {}
for subscription, client in self.clients.items():
for subscription_name, client in self.clients.items():
try:
pricings_list = client.pricings.list()
pricings.update({subscription: {}})
pricings.update({subscription_name: {}})
for pricing in pricings_list.value:
pricings[subscription].update(
pricings[subscription_name].update(
{
pricing.name: Defender_Pricing(
resource_id=pricing.id,
@@ -34,19 +35,19 @@ class Defender(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return pricings
def __get_auto_provisioning_settings__(self):
logger.info("Defender - Getting auto provisioning settings...")
auto_provisioning = {}
for subscription, client in self.clients.items():
for subscription_name, client in self.clients.items():
try:
auto_provisioning_settings = client.auto_provisioning_settings.list()
auto_provisioning.update({subscription: {}})
auto_provisioning.update({subscription_name: {}})
for ap in auto_provisioning_settings:
auto_provisioning[subscription].update(
auto_provisioning[subscription_name].update(
{
ap.name: AutoProvisioningSetting(
resource_id=ap.id,
@@ -57,12 +58,37 @@ class Defender(AzureService):
}
)
except Exception as error:
logger.error(f"Subscription name: {subscription}")
logger.error(f"Subscription name: {subscription_name}")
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return auto_provisioning
def __get_assessments__(self):
logger.info("Defender - Getting assessments...")
assessments = {}
for subscription_name, client in self.clients.items():
try:
assessments_list = client.assessments.list(
f"subscriptions/{self.subscriptions[subscription_name]}"
)
assessments.update({subscription_name: {}})
for assessment in assessments_list:
assessments[subscription_name].update(
{
assessment.display_name: Defender_Assessments(
resource_id=assessment.id,
resource_name=assessment.name,
status=assessment.status.code,
)
}
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return assessments
class Defender_Pricing(BaseModel):
resource_id: str
@@ -75,3 +101,9 @@ class AutoProvisioningSetting(BaseModel):
resource_name: str
resource_type: str
auto_provision: str
class Defender_Assessments(BaseModel):
resource_id: str
resource_name: str
status: str

View File

@@ -1,3 +1,41 @@
from uuid import uuid4
from azure.identity import DefaultAzureCredential
from prowler.providers.azure.lib.audit_info.models import (
Azure_Audit_Info,
Azure_Identity_Info,
Azure_Region_Config,
)
AZURE_SUSCRIPTION = str(uuid4())
# Azure Identity
IDENTITY_ID = "00000000-0000-0000-0000-000000000000"
IDENTITY_TYPE = "Service Principal"
TENANT_IDS = ["00000000-0000-0000-0000-000000000000"]
DOMAIN = "user.onmicrosoft.com"
# Mocked Azure Audit Info
def set_mocked_azure_audit_info(
credentials: DefaultAzureCredential = DefaultAzureCredential(),
identity: Azure_Identity_Info = Azure_Identity_Info(
identity_id=IDENTITY_ID,
identity_type=IDENTITY_TYPE,
tenant_ids=TENANT_IDS,
domain=DOMAIN,
subscriptions={AZURE_SUSCRIPTION: "id_subscription"},
),
audit_config: dict = None,
azure_region_config: Azure_Region_Config = Azure_Region_Config(),
):
audit_info = Azure_Audit_Info(
credentials=credentials,
identity=identity,
audit_metadata=None,
audit_resources=None,
audit_config=audit_config,
azure_region_config=azure_region_config,
)
return audit_info

View File

@@ -0,0 +1,91 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.defender.defender_service import (
Defender_Assessments,
)
from tests.providers.azure.azure_fixtures import AZURE_SUSCRIPTION
class Test_defender_auto_provisioning_vulnerabilty_assessments_machines_on:
def test_defender_no_app_services(self):
defender_client = mock.MagicMock
defender_client.assessments = {}
with mock.patch(
"prowler.providers.azure.services.defender.defender_auto_provisioning_vulnerabilty_assessments_machines_on.defender_auto_provisioning_vulnerabilty_assessments_machines_on.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_auto_provisioning_vulnerabilty_assessments_machines_on.defender_auto_provisioning_vulnerabilty_assessments_machines_on import (
defender_auto_provisioning_vulnerabilty_assessments_machines_on,
)
check = defender_auto_provisioning_vulnerabilty_assessments_machines_on()
result = check.execute()
assert len(result) == 0
def test_defender_machines_no_vulnerability_assessment_solution(self):
resource_id = str(uuid4())
defender_client = mock.MagicMock
defender_client.assessments = {
AZURE_SUSCRIPTION: {
"Machines should have a vulnerability assessment solution": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Unhealthy",
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_auto_provisioning_vulnerabilty_assessments_machines_on.defender_auto_provisioning_vulnerabilty_assessments_machines_on.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_auto_provisioning_vulnerabilty_assessments_machines_on.defender_auto_provisioning_vulnerabilty_assessments_machines_on import (
defender_auto_provisioning_vulnerabilty_assessments_machines_on,
)
check = defender_auto_provisioning_vulnerabilty_assessments_machines_on()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Vulnerability assessment is not set up in all VMs in subscription {AZURE_SUSCRIPTION}."
)
assert result[0].subscription == AZURE_SUSCRIPTION
assert result[0].resource_name == "vm1"
assert result[0].resource_id == resource_id
def test_defender_machines_vulnerability_assessment_solution(self):
resource_id = str(uuid4())
defender_client = mock.MagicMock
defender_client.assessments = {
AZURE_SUSCRIPTION: {
"Machines should have a vulnerability assessment solution": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_auto_provisioning_vulnerabilty_assessments_machines_on.defender_auto_provisioning_vulnerabilty_assessments_machines_on.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_auto_provisioning_vulnerabilty_assessments_machines_on.defender_auto_provisioning_vulnerabilty_assessments_machines_on import (
defender_auto_provisioning_vulnerabilty_assessments_machines_on,
)
check = defender_auto_provisioning_vulnerabilty_assessments_machines_on()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Vulnerability assessment is set up in all VMs in subscription {AZURE_SUSCRIPTION}."
)
assert result[0].subscription == AZURE_SUSCRIPTION
assert result[0].resource_name == "vm1"
assert result[0].resource_id == resource_id

View File

@@ -0,0 +1,201 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.defender.defender_service import (
Defender_Assessments,
)
from tests.providers.azure.azure_fixtures import AZURE_SUSCRIPTION
class Test_defender_ensure_system_updates_are_applied:
def test_defender_no_app_services(self):
defender_client = mock.MagicMock
defender_client.assessments = {}
with mock.patch(
"prowler.providers.azure.services.defender.defender_ensure_system_updates_are_applied.defender_ensure_system_updates_are_applied.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_ensure_system_updates_are_applied.defender_ensure_system_updates_are_applied import (
defender_ensure_system_updates_are_applied,
)
check = defender_ensure_system_updates_are_applied()
result = check.execute()
assert len(result) == 0
def test_defender_machines_no_log_analytics_installed(self):
resource_id = str(uuid4())
defender_client = mock.MagicMock
defender_client.assessments = {
AZURE_SUSCRIPTION: {
"Log Analytics agent should be installed on virtual machines": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Unhealthy",
),
"Machines should be configured to periodically check for missing system updates": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
),
"System updates should be installed on your machines": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
),
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_ensure_system_updates_are_applied.defender_ensure_system_updates_are_applied.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_ensure_system_updates_are_applied.defender_ensure_system_updates_are_applied import (
defender_ensure_system_updates_are_applied,
)
check = defender_ensure_system_updates_are_applied()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"System updates are not applied for all the VMs in the subscription {AZURE_SUSCRIPTION}."
)
assert result[0].subscription == AZURE_SUSCRIPTION
assert result[0].resource_name == "vm1"
assert result[0].resource_id == resource_id
def test_defender_machines_no_configured_to_periodically_check_for_system_updates(
self,
):
resource_id = str(uuid4())
defender_client = mock.MagicMock
defender_client.assessments = {
AZURE_SUSCRIPTION: {
"Log Analytics agent should be installed on virtual machines": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
),
"Machines should be configured to periodically check for missing system updates": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Unhealthy",
),
"System updates should be installed on your machines": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
),
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_ensure_system_updates_are_applied.defender_ensure_system_updates_are_applied.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_ensure_system_updates_are_applied.defender_ensure_system_updates_are_applied import (
defender_ensure_system_updates_are_applied,
)
check = defender_ensure_system_updates_are_applied()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"System updates are not applied for all the VMs in the subscription {AZURE_SUSCRIPTION}."
)
assert result[0].subscription == AZURE_SUSCRIPTION
assert result[0].resource_name == "vm1"
assert result[0].resource_id == resource_id
def test_defender_machines_no_system_updates_installed(self):
resource_id = str(uuid4())
defender_client = mock.MagicMock
defender_client.assessments = {
AZURE_SUSCRIPTION: {
"Log Analytics agent should be installed on virtual machines": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
),
"Machines should be configured to periodically check for missing system updates": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
),
"System updates should be installed on your machines": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Unhealthy",
),
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_ensure_system_updates_are_applied.defender_ensure_system_updates_are_applied.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_ensure_system_updates_are_applied.defender_ensure_system_updates_are_applied import (
defender_ensure_system_updates_are_applied,
)
check = defender_ensure_system_updates_are_applied()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"System updates are not applied for all the VMs in the subscription {AZURE_SUSCRIPTION}."
)
assert result[0].subscription == AZURE_SUSCRIPTION
assert result[0].resource_name == "vm1"
assert result[0].resource_id == resource_id
def test_defender_machines_configured_to_periodically_check_for_system_updates_and_system_updates_installed(
self,
):
resource_id = str(uuid4())
defender_client = mock.MagicMock
defender_client.assessments = {
AZURE_SUSCRIPTION: {
"Log Analytics agent should be installed on virtual machines": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
),
"Machines should be configured to periodically check for missing system updates": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
),
"System updates should be installed on your machines": Defender_Assessments(
resource_id=resource_id,
resource_name="vm1",
status="Healthy",
),
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_ensure_system_updates_are_applied.defender_ensure_system_updates_are_applied.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_ensure_system_updates_are_applied.defender_ensure_system_updates_are_applied import (
defender_ensure_system_updates_are_applied,
)
check = defender_ensure_system_updates_are_applied()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"System updates are applied for all the VMs in the subscription {AZURE_SUSCRIPTION}."
)
assert result[0].subscription == AZURE_SUSCRIPTION
assert result[0].resource_name == "vm1"
assert result[0].resource_id == resource_id

View File

@@ -0,0 +1,131 @@
from datetime import timedelta
from unittest.mock import patch
from prowler.providers.azure.services.defender.defender_service import (
AutoProvisioningSetting,
Defender,
Defender_Assessments,
Defender_Pricing,
)
from tests.providers.azure.azure_fixtures import (
AZURE_SUSCRIPTION,
set_mocked_azure_audit_info,
)
def mock_defender_get_pricings(_):
return {
AZURE_SUSCRIPTION: {
"Standard": Defender_Pricing(
resource_id="resource_id",
pricing_tier="pricing_tier",
free_trial_remaining_time=timedelta(days=1),
)
}
}
def mock_defender_get_auto_provisioning_settings(_):
return {
AZURE_SUSCRIPTION: {
"default": AutoProvisioningSetting(
resource_id="/subscriptions/resource_id",
resource_name="default",
resource_type="Microsoft.Security/autoProvisioningSettings",
auto_provision="On",
)
}
}
def mock_defender_get_assessments(_):
return {
AZURE_SUSCRIPTION: {
"default": Defender_Assessments(
resource_id="/subscriptions/resource_id",
resource_name="default",
status="Healthy",
)
}
}
@patch(
"prowler.providers.azure.services.defender.defender_service.Defender.__get_pricings__",
new=mock_defender_get_pricings,
)
@patch(
"prowler.providers.azure.services.defender.defender_service.Defender.__get_auto_provisioning_settings__",
new=mock_defender_get_auto_provisioning_settings,
)
@patch(
"prowler.providers.azure.services.defender.defender_service.Defender.__get_assessments__",
new=mock_defender_get_assessments,
)
class Test_Defender_Service:
def test__get_client__(self):
defender = Defender(set_mocked_azure_audit_info())
assert (
defender.clients[AZURE_SUSCRIPTION].__class__.__name__ == "SecurityCenter"
)
def test__get_subscriptions__(self):
defender = Defender(set_mocked_azure_audit_info())
defender = Defender(set_mocked_azure_audit_info())
assert defender.subscriptions.__class__.__name__ == "dict"
def test__get_pricings__(self):
defender = Defender(set_mocked_azure_audit_info())
assert len(defender.pricings) == 1
assert (
defender.pricings[AZURE_SUSCRIPTION]["Standard"].resource_id
== "resource_id"
)
assert (
defender.pricings[AZURE_SUSCRIPTION]["Standard"].pricing_tier
== "pricing_tier"
)
assert defender.pricings[AZURE_SUSCRIPTION][
"Standard"
].free_trial_remaining_time == timedelta(days=1)
def test__get_auto_provisioning_settings__(self):
defender = Defender(set_mocked_azure_audit_info())
assert len(defender.auto_provisioning_settings) == 1
assert (
defender.auto_provisioning_settings[AZURE_SUSCRIPTION][
"default"
].resource_id
== "/subscriptions/resource_id"
)
assert (
defender.auto_provisioning_settings[AZURE_SUSCRIPTION][
"default"
].resource_name
== "default"
)
assert (
defender.auto_provisioning_settings[AZURE_SUSCRIPTION][
"default"
].resource_type
== "Microsoft.Security/autoProvisioningSettings"
)
assert (
defender.auto_provisioning_settings[AZURE_SUSCRIPTION][
"default"
].auto_provision
== "On"
)
def test__get_assessments__(self):
defender = Defender(set_mocked_azure_audit_info())
assert len(defender.assessments) == 1
assert (
defender.assessments[AZURE_SUSCRIPTION]["default"].resource_id
== "/subscriptions/resource_id"
)
assert (
defender.assessments[AZURE_SUSCRIPTION]["default"].resource_name
== "default"
)
assert defender.assessments[AZURE_SUSCRIPTION]["default"].status == "Healthy"