feat(azure): Azure new checks related with AKS (#3476)

This commit is contained in:
Rubén De la Torre Vico
2024-03-05 13:20:56 +00:00
committed by GitHub
parent 00ab5b5fc2
commit ddd43bae5d
33 changed files with 1380 additions and 3 deletions

18
poetry.lock generated
View File

@@ -391,6 +391,22 @@ azure-common = ">=1.1,<2.0"
azure-mgmt-core = ">=1.3.2,<2.0.0" azure-mgmt-core = ">=1.3.2,<2.0.0"
isodate = ">=0.6.1,<1.0.0" isodate = ">=0.6.1,<1.0.0"
[[package]]
name = "azure-mgmt-containerservice"
version = "29.1.0"
description = "Microsoft Azure Container Service Management Client Library for Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "azure-mgmt-containerservice-29.1.0.tar.gz", hash = "sha256:46887178bb1035933f06fa63121c1ac9d4c5871f202ae2b86bc4af6e1e3b354f"},
{file = "azure_mgmt_containerservice-29.1.0-py3-none-any.whl", hash = "sha256:0941a26a9c61930e004001e7340812dadb8a726e2ccc5b4d30ce4e6403fe1f43"},
]
[package.dependencies]
azure-common = ">=1.1,<2.0"
azure-mgmt-core = ">=1.3.2,<2.0.0"
isodate = ">=0.6.1,<1.0.0"
[[package]] [[package]]
name = "azure-mgmt-core" name = "azure-mgmt-core"
version = "1.4.0" version = "1.4.0"
@@ -4409,4 +4425,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = ">=3.9,<3.13" python-versions = ">=3.9,<3.13"
content-hash = "2321e887337f7d192280547782a7486b3bcc915f13f43bf6784f52aa8457aff8" content-hash = "7b095528333431a071ec109796a25a99993b7213cd5c41ea67d61a96e54042f8"

View File

@@ -0,0 +1,4 @@
from prowler.providers.azure.lib.audit_info.audit_info import azure_audit_info
from prowler.providers.azure.services.aks.aks_service import AKS
aks_client = AKS(azure_audit_info)

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "aks_cluster_rbac_enabled",
"CheckTitle": "Ensure AKS RBAC is enabled",
"CheckType": [],
"ServiceName": "aks",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Microsoft.ContainerService/ManagedClusters",
"Description": "Azure Kubernetes Service (AKS) can be configured to use Azure Active Directory (AD) for user authentication. In this configuration, you sign in to an AKS cluster using an Azure AD authentication token. You can also configure Kubernetes role-based access control (Kubernetes RBAC) to limit access to cluster resources based a user's identity or group membership.",
"Risk": "Kubernetes RBAC and AKS help you secure your cluster access and provide only the minimum required permissions to developers and operators.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/aks/azure-ad-rbac?tabs=portal",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/azure/AKS/enable-role-based-access-control-for-kubernetes-service.html#",
"Terraform": "https://docs.bridgecrew.io/docs/bc_azr_kubernetes_2#terraform"
},
"Recommendation": {
"Text": "",
"Url": "https://learn.microsoft.com/en-us/security/benchmark/azure/security-controls-v2-privileged-access#pa-7-follow-just-enough-administration-least-privilege-principle"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.aks.aks_client import aks_client
class aks_cluster_rbac_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for cluster_id, cluster in clusters.items():
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = cluster.name
report.resource_id = cluster_id
report.status_extended = f"RBAC is enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
if not cluster.rbac_enabled:
report.status = "FAIL"
report.status_extended = f"RBAC is not enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "aks_clusters_created_with_private_nodes",
"CheckTitle": "Ensure clusters are created with Private Nodes",
"CheckType": [],
"ServiceName": "aks",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Microsoft.ContainerService/ManagedClusters",
"Description": "Disable public IP addresses for cluster nodes, so that they only have private IP addresses. Private Nodes are nodes with no public IP addresses.",
"Risk": "Disabling public IP addresses on cluster nodes restricts access to only internal networks, forcing attackers to obtain local network access before attempting to compromise the underlying Kubernetes hosts.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/aks/private-clusters",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "",
"Url": "https://learn.microsoft.com/en-us/azure/aks/access-private-cluster"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,26 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.aks.aks_client import aks_client
class aks_clusters_created_with_private_nodes(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for cluster_id, cluster in clusters.items():
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = cluster.name
report.resource_id = cluster_id
report.status_extended = f"Cluster '{cluster.name}' was created with private nodes in subscription '{subscription_name}'"
for agent_pool in cluster.agent_pool_profiles:
if getattr(agent_pool, "enable_node_public_ip", True):
report.status = "FAIL"
report.status_extended = f"Cluster '{cluster.name}' was not created with private nodes in subscription '{subscription_name}'"
break
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "aks_clusters_public_access_disabled",
"CheckTitle": "Ensure clusters are created with Private Endpoint Enabled and Public Access Disabled",
"CheckType": [],
"ServiceName": "aks",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Microsoft.ContainerService/ManagedClusters",
"Description": "Disable access to the Kubernetes API from outside the node network if it is not required.",
"Risk": "In a private cluster, the master node has two endpoints, a private and public endpoint. The private endpoint is the internal IP address of the master, behind an internal load balancer in the master's wirtual network. Nodes communicate with the master using the private endpoint. The public endpoint enables the Kubernetes API to be accessed from outside the master's virtual network. Although Kubernetes API requires an authorized token to perform sensitive actions, a vulnerability could potentially expose the Kubernetes publically with unrestricted access. Additionally, an attacker may be able to identify the current cluster and Kubernetes API version and determine whether it is vulnerable to an attack. Unless required, disabling public endpoint will help prevent such threats, and require the attacker to be on the master's virtual network to perform any attack on the Kubernetes API.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/aks/private-clusters?tabs=azure-portal",
"Remediation": {
"Code": {
"CLI": "az aks update -n <cluster_name> -g <resource_group> --disable-public-fqdn",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "To use a private endpoint, create a new private endpoint in your virtual network then create a link between your virtual network and a new private DNS zone",
"Url": "https://learn.microsoft.com/en-us/azure/aks/access-private-cluster?tabs=azure-cli"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,26 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.aks.aks_client import aks_client
class aks_clusters_public_access_disabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for cluster_id, cluster in clusters.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report.subscription = subscription_name
report.resource_name = cluster.name
report.resource_id = cluster_id
report.status_extended = f"Public access to nodes is enabled for cluster '{cluster.name}' in subscription '{subscription_name}'"
if cluster.private_fqdn:
for agent_pool in cluster.agent_pool_profiles:
if not getattr(agent_pool, "enable_node_public_ip", False):
report.status = "PASS"
report.status_extended = f"Public access to nodes is disabled for cluster '{cluster.name}' in subscription '{subscription_name}'"
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "aks_network_policy_enabled",
"CheckTitle": "Ensure Network Policy is Enabled and set as appropriate",
"CheckType": [],
"ServiceName": "aks",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Microsoft.ContainerService/managedClusters",
"Description": "When you run modern, microservices-based applications in Kubernetes, you often want to control which components can communicate with each other. The principle of least privilege should be applied to how traffic can flow between pods in an Azure Kubernetes Service (AKS) cluster. Let's say you likely want to block traffic directly to back-end applications. The Network Policy feature in Kubernetes lets you define rules for ingress and egress traffic between pods in a cluster.",
"Risk": "All pods in an AKS cluster can send and receive traffic without limitations, by default. To improve security, you can define rules that control the flow of traffic. Back-end applications are often only exposed to required front-end services, for example. Or, database components are only accessible to the application tiers that connect to them. Network Policy is a Kubernetes specification that defines access policies for communication between Pods. Using Network Policies, you define an ordered set of rules to send and receive traffic and apply them to a collection of pods that match one or more label selectors. These network policy rules are defined as YAML manifests. Network policies can be included as part of a wider manifest that also creates a deployment or service.",
"RelatedUrl": "https://learn.microsoft.com/en-us/security/benchmark/azure/security-controls-v2-network-security#ns-2-connect-private-networks-together",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/bc_azr_kubernetes_4#terraform"
},
"Recommendation": {
"Text": "",
"Url": "https://learn.microsoft.com/en-us/azure/aks/use-network-policies"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Network Policy requires the Network Policy add-on. This add-on is included automatically when a cluster with Network Policy is created, but for an existing cluster, needs to be added prior to enabling Network Policy. Enabling/Disabling Network Policy causes a rolling update of all cluster nodes, similar to performing a cluster upgrade. This operation is long-running and will block other operations on the cluster (including delete) until it has run to completion. If Network Policy is used, a cluster must have at least 2 nodes of type n1-standard-1 or higher. The recommended minimum size cluster to run Network Policy enforcement is 3 n1-standard-1 instances. Enabling Network Policy enforcement consumes additional resources in nodes. Specifically, it increases the memory footprint of the kube-system process by approximately 128MB, and requires approximately 300 millicores of CPU."
}

View File

@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.aks.aks_client import aks_client
class aks_network_policy_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for cluster_id, cluster in clusters.items():
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = cluster.name
report.resource_id = cluster_id
report.status_extended = f"Network policy is enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
if not getattr(cluster, "network_policy", False):
report.status = "FAIL"
report.status_extended = f"Network policy is not enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
findings.append(report)
return findings

View File

@@ -0,0 +1,65 @@
from dataclasses import dataclass
from azure.mgmt.containerservice import ContainerServiceClient
from azure.mgmt.containerservice.models import ManagedClusterAgentPoolProfile
from prowler.lib.logger import logger
from prowler.providers.azure.lib.audit_info.models import Azure_Audit_Info
from prowler.providers.azure.lib.service.service import AzureService
########################## AKS (Azure Kubernetes Service)
class AKS(AzureService):
def __init__(self, audit_info: Azure_Audit_Info):
super().__init__(ContainerServiceClient, audit_info)
self.clusters = self.__get_clusters__()
def __get_clusters__(self):
logger.info("AKS - Getting clusters...")
clusters = {}
for subscription_name, client in self.clients.items():
try:
clusters_list = client.managed_clusters.list()
clusters.update({subscription_name: {}})
for cluster in clusters_list:
if getattr(cluster, "kubernetes_version", None):
clusters[subscription_name].update(
{
cluster.id: Cluster(
name=cluster.name,
public_fqdn=cluster.fqdn,
private_fqdn=cluster.private_fqdn,
network_policy=(
getattr(
cluster.network_profile,
"network_policy",
None,
)
if getattr(cluster, "network_profile", None)
else None
),
agent_pool_profiles=getattr(
cluster, "agent_pool_profiles", []
),
rbac_enabled=getattr(cluster, "enable_rbac", False),
)
}
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return clusters
@dataclass
class Cluster:
name: str
public_fqdn: str
private_fqdn: str
network_policy: str
agent_pool_profiles: list[ManagedClusterAgentPoolProfile]
rbac_enabled: bool

View File

@@ -10,7 +10,6 @@ class defender_assessments_vm_endpoint_protection_installed(Check):
subscription_name, subscription_name,
assessments, assessments,
) in defender_client.assessments.items(): ) in defender_client.assessments.items():
pass
if ( if (
"Install endpoint protection solution on virtual machines" "Install endpoint protection solution on virtual machines"

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "defender_container_images_resolved_vulnerabilities",
"CheckTitle": "Container images used by containers should have vulnerabilities resolved",
"CheckType": [],
"ServiceName": "defender",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Microsoft.Security/assessments",
"Description": "Container images used by containers should have vulnerabilities resolved. Azure Defender for Container Registries can help you identify and resolve vulnerabilities in your container images. It provides vulnerability scanning and prioritized security recommendations for your container images. You can use Azure Defender for Container Registries to scan your container images for vulnerabilities and get prioritized security recommendations to resolve them. You can also use Azure Defender for Container Registries to monitor your container registries for security threats and get prioritized security recommendations to resolve them. Azure Defender for Container Registries integrates with Azure Security Center to provide a unified view of security across your container registries and other Azure resources. Azure Defender for Container Registries is part of Azure Defender, which provides advanced threat protection for your hybrid workloads. Azure Defender uses advanced analytics and global threat intelligence to detect attacks that might otherwise go unnoticed.",
"Risk": "If vulnerabilities are not resolved, attackers can exploit them to gain unauthorized access to your containerized applications and data.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/container-registry/container-registry-check-health",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "",
"Url": "https://learn.microsoft.com/en-us/azure/container-registry/scan-images-defender"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,47 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.defender.defender_client import defender_client
class defender_container_images_resolved_vulnerabilities(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for (
subscription_name,
assessments,
) in defender_client.assessments.items():
if (
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
in assessments
and getattr(
assessments[
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
],
"status",
"NotApplicable",
)
!= "NotApplicable"
):
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = assessments[
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
].resource_name
report.resource_id = assessments[
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
].resource_id
report.status_extended = f"Azure running container images do not have unresolved vulnerabilities in subscription '{subscription_name}'."
if (
assessments[
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
].status
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"Azure running container images have unresolved vulnerabilities in subscription '{subscription_name}'."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "defender_container_images_scan_enabled",
"CheckTitle": "Ensure Image Vulnerability Scanning using Azure Defender image scanning or a third party provider",
"CheckType": [],
"ServiceName": "defender",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Microsoft.Security",
"Description": "Scan images being deployed to Azure (AKS) for vulnerabilities. Vulnerability scanning for images stored in Azure Container Registry is generally available in Azure Security Center. This capability is powered by Qualys, a leading provider of information security. When you push an image to Container Registry, Security Center automatically scans it, then checks for known vulnerabilities in packages or dependencies defined in the file. When the scan completes (after about 10 minutes), Security Center provides details and a security classification for each vulnerability detected, along with guidance on how to remediate issues and protect vulnerable attack surfaces.",
"Risk": "Vulnerabilities in software packages can be exploited by hackers or malicious users to obtain unauthorized access to local cloud resources. Azure Defender and other third party products allow images to be scanned for known vulnerabilities.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/container-registry/container-registry-check-health",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "",
"Url": "https://learn.microsoft.com/en-us/azure/container-registry/scan-images-defender"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "When using an Azure container registry, you might occasionally encounter problems. For example, you might not be able to pull a container image because of an issue with Docker in your local environment. Or, a network issue might prevent you from connecting to the registry."
}

View File

@@ -0,0 +1,25 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.defender.defender_client import defender_client
class defender_container_images_scan_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
if "Containers" in pricings:
report = Check_Report_Azure(self.metadata())
report.status = "PASS"
report.subscription = subscription
report.resource_id = pricings["Containers"].resource_id
report.resource_name = "Dender plan for Containers"
report.status_extended = (
f"Container image scan is enabled in subscription {subscription}."
)
if not pricings["Containers"].extensions.get(
"ContainerRegistriesVulnerabilityAssessments"
):
report.status = "FAIL"
report.status_extended = f"Container image scan is disabled in subscription {subscription}."
findings.append(report)
return findings

View File

@@ -1,4 +1,5 @@
from datetime import timedelta from datetime import timedelta
from typing import Dict
from azure.core.exceptions import HttpResponseError from azure.core.exceptions import HttpResponseError
from azure.mgmt.security import SecurityCenter from azure.mgmt.security import SecurityCenter
@@ -32,8 +33,18 @@ class Defender(AzureService):
{ {
pricing.name: Pricing( pricing.name: Pricing(
resource_id=pricing.id, resource_id=pricing.id,
pricing_tier=pricing.pricing_tier, pricing_tier=getattr(pricing, "pricing_tier", None),
free_trial_remaining_time=pricing.free_trial_remaining_time, free_trial_remaining_time=pricing.free_trial_remaining_time,
extensions=dict(
[
(extension.name, extension.is_enabled)
for extension in (
pricing.extensions
if getattr(pricing, "extensions", None)
else []
)
]
),
) )
} }
) )
@@ -192,6 +203,7 @@ class Pricing(BaseModel):
resource_id: str resource_id: str
pricing_tier: str pricing_tier: str
free_trial_remaining_time: timedelta free_trial_remaining_time: timedelta
extensions: Dict[str, bool] = {}
class AutoProvisioningSetting(BaseModel): class AutoProvisioningSetting(BaseModel):

View File

@@ -32,6 +32,7 @@ azure-keyvault-keys = "4.9.0"
azure-mgmt-applicationinsights = "4.0.0" azure-mgmt-applicationinsights = "4.0.0"
azure-mgmt-authorization = "4.0.0" azure-mgmt-authorization = "4.0.0"
azure-mgmt-compute = "30.5.0" azure-mgmt-compute = "30.5.0"
azure-mgmt-containerservice = "29.1.0"
azure-mgmt-cosmosdb = "9.4.0" azure-mgmt-cosmosdb = "9.4.0"
azure-mgmt-keyvault = "10.3.0" azure-mgmt-keyvault = "10.3.0"
azure-mgmt-monitor = "6.0.2" azure-mgmt-monitor = "6.0.2"

View File

@@ -0,0 +1,111 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.aks.aks_service import Cluster
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_aks_cluster_rbac_enabled:
def test_aks_no_subscriptions(self):
aks_client = mock.MagicMock
aks_client.clusters = {}
with mock.patch(
"prowler.providers.azure.services.aks.aks_cluster_rbac_enabled.aks_cluster_rbac_enabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_cluster_rbac_enabled.aks_cluster_rbac_enabled import (
aks_cluster_rbac_enabled,
)
check = aks_cluster_rbac_enabled()
result = check.execute()
assert len(result) == 0
def test_aks_subscription_empty(self):
aks_client = mock.MagicMock
aks_client.clusters = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.aks.aks_cluster_rbac_enabled.aks_cluster_rbac_enabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_cluster_rbac_enabled.aks_cluster_rbac_enabled import (
aks_cluster_rbac_enabled,
)
check = aks_cluster_rbac_enabled()
result = check.execute()
assert len(result) == 0
def test_aks_cluster_rbac_enabled(self):
aks_client = mock.MagicMock
cluster_id = str(uuid4())
aks_client.clusters = {
AZURE_SUBSCRIPTION: {
cluster_id: Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn=None,
network_policy="network_policy",
agent_pool_profiles=[mock.MagicMock(enable_node_public_ip=False)],
rbac_enabled=True,
)
}
}
with mock.patch(
"prowler.providers.azure.services.aks.aks_cluster_rbac_enabled.aks_cluster_rbac_enabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_cluster_rbac_enabled.aks_cluster_rbac_enabled import (
aks_cluster_rbac_enabled,
)
check = aks_cluster_rbac_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"RBAC is enabled for cluster 'cluster_name' in subscription '{AZURE_SUBSCRIPTION}'."
)
assert result[0].resource_name == "cluster_name"
assert result[0].resource_id == cluster_id
assert result[0].subscription == AZURE_SUBSCRIPTION
def test_aks_rbac_not_enabled(self):
aks_client = mock.MagicMock
cluster_id = str(uuid4())
aks_client.clusters = {
AZURE_SUBSCRIPTION: {
cluster_id: Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn=None,
network_policy="network_policy",
agent_pool_profiles=[mock.MagicMock(enable_node_public_ip=False)],
rbac_enabled=False,
)
}
}
with mock.patch(
"prowler.providers.azure.services.aks.aks_cluster_rbac_enabled.aks_cluster_rbac_enabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_cluster_rbac_enabled.aks_cluster_rbac_enabled import (
aks_cluster_rbac_enabled,
)
check = aks_cluster_rbac_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"RBAC is not enabled for cluster 'cluster_name' in subscription '{AZURE_SUBSCRIPTION}'."
)
assert result[0].resource_name == "cluster_name"
assert result[0].resource_id == cluster_id
assert result[0].subscription == AZURE_SUBSCRIPTION

View File

@@ -0,0 +1,151 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.aks.aks_service import Cluster
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_aks_clusters_created_with_private_nodes:
def test_aks_no_subscriptions(self):
aks_client = mock.MagicMock
aks_client.clusters = {}
with mock.patch(
"prowler.providers.azure.services.aks.aks_clusters_created_with_private_nodes.aks_clusters_created_with_private_nodes.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_clusters_created_with_private_nodes.aks_clusters_created_with_private_nodes import (
aks_clusters_created_with_private_nodes,
)
check = aks_clusters_created_with_private_nodes()
result = check.execute()
assert len(result) == 0
def test_aks_subscription_empty(self):
aks_client = mock.MagicMock
aks_client.clusters = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.aks.aks_clusters_created_with_private_nodes.aks_clusters_created_with_private_nodes.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_clusters_created_with_private_nodes.aks_clusters_created_with_private_nodes import (
aks_clusters_created_with_private_nodes,
)
check = aks_clusters_created_with_private_nodes()
result = check.execute()
assert len(result) == 0
def test_aks_cluster_no_private_nodes(self):
aks_client = mock.MagicMock
cluster_id = str(uuid4())
aks_client.clusters = {
AZURE_SUBSCRIPTION: {
cluster_id: Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn="",
network_policy="network_policy",
agent_pool_profiles=[mock.MagicMock(enable_node_public_ip=True)],
rbac_enabled=True,
)
}
}
with mock.patch(
"prowler.providers.azure.services.aks.aks_clusters_created_with_private_nodes.aks_clusters_created_with_private_nodes.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_clusters_created_with_private_nodes.aks_clusters_created_with_private_nodes import (
aks_clusters_created_with_private_nodes,
)
check = aks_clusters_created_with_private_nodes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Cluster 'cluster_name' was not created with private nodes in subscription '{AZURE_SUBSCRIPTION}'"
)
assert result[0].resource_id == cluster_id
assert result[0].resource_name == "cluster_name"
assert result[0].subscription == AZURE_SUBSCRIPTION
def test_aks_cluster_private_nodes(self):
aks_client = mock.MagicMock
cluster_id = str(uuid4())
aks_client.clusters = {
AZURE_SUBSCRIPTION: {
cluster_id: Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn="private_fqdn",
network_policy="network_policy",
agent_pool_profiles=[mock.MagicMock(enable_node_public_ip=False)],
rbac_enabled=True,
)
}
}
with mock.patch(
"prowler.providers.azure.services.aks.aks_clusters_created_with_private_nodes.aks_clusters_created_with_private_nodes.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_clusters_created_with_private_nodes.aks_clusters_created_with_private_nodes import (
aks_clusters_created_with_private_nodes,
)
check = aks_clusters_created_with_private_nodes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Cluster 'cluster_name' was created with private nodes in subscription '{AZURE_SUBSCRIPTION}'"
)
assert result[0].resource_id == cluster_id
assert result[0].resource_name == "cluster_name"
assert result[0].subscription == AZURE_SUBSCRIPTION
def test_aks_cluster_public_and_private_nodes(self):
aks_client = mock.MagicMock
cluster_id = str(uuid4())
aks_client.clusters = {
AZURE_SUBSCRIPTION: {
cluster_id: Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn="private_fqdn",
network_policy="network_policy",
agent_pool_profiles=[
mock.MagicMock(enable_node_public_ip=False),
mock.MagicMock(enable_node_public_ip=True),
mock.MagicMock(enable_node_public_ip=False),
],
rbac_enabled=True,
)
}
}
with mock.patch(
"prowler.providers.azure.services.aks.aks_clusters_created_with_private_nodes.aks_clusters_created_with_private_nodes.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_clusters_created_with_private_nodes.aks_clusters_created_with_private_nodes import (
aks_clusters_created_with_private_nodes,
)
check = aks_clusters_created_with_private_nodes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Cluster 'cluster_name' was not created with private nodes in subscription '{AZURE_SUBSCRIPTION}'"
)
assert result[0].resource_id == cluster_id
assert result[0].resource_name == "cluster_name"
assert result[0].subscription == AZURE_SUBSCRIPTION

View File

@@ -0,0 +1,147 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.aks.aks_service import Cluster
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_aks_clusters_public_access_disabled:
def test_aks_no_subscriptions(self):
aks_client = mock.MagicMock
aks_client.clusters = {}
with mock.patch(
"prowler.providers.azure.services.aks.aks_clusters_public_access_disabled.aks_clusters_public_access_disabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_clusters_public_access_disabled.aks_clusters_public_access_disabled import (
aks_clusters_public_access_disabled,
)
check = aks_clusters_public_access_disabled()
result = check.execute()
assert len(result) == 0
def test_aks_subscription_empty(self):
aks_client = mock.MagicMock
aks_client.clusters = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.aks.aks_clusters_public_access_disabled.aks_clusters_public_access_disabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_clusters_public_access_disabled.aks_clusters_public_access_disabled import (
aks_clusters_public_access_disabled,
)
check = aks_clusters_public_access_disabled()
result = check.execute()
assert len(result) == 0
def test_aks_cluster_public_fqdn(self):
aks_client = mock.MagicMock
cluster_id = str(uuid4())
aks_client.clusters = {
AZURE_SUBSCRIPTION: {
cluster_id: Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn=None,
network_policy="network_policy",
agent_pool_profiles=[mock.MagicMock(enable_node_public_ip=False)],
rbac_enabled=True,
)
}
}
with mock.patch(
"prowler.providers.azure.services.aks.aks_clusters_public_access_disabled.aks_clusters_public_access_disabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_clusters_public_access_disabled.aks_clusters_public_access_disabled import (
aks_clusters_public_access_disabled,
)
check = aks_clusters_public_access_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Public access to nodes is enabled for cluster 'cluster_name' in subscription '{AZURE_SUBSCRIPTION}'"
)
assert result[0].resource_id == cluster_id
assert result[0].resource_name == "cluster_name"
assert result[0].subscription == AZURE_SUBSCRIPTION
def test_aks_cluster_private_fqdn(self):
aks_client = mock.MagicMock
cluster_id = str(uuid4())
aks_client.clusters = {
AZURE_SUBSCRIPTION: {
cluster_id: Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn="private_fqdn",
network_policy="network_policy",
agent_pool_profiles=[mock.MagicMock(enable_node_public_ip=False)],
rbac_enabled=True,
)
}
}
with mock.patch(
"prowler.providers.azure.services.aks.aks_clusters_public_access_disabled.aks_clusters_public_access_disabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_clusters_public_access_disabled.aks_clusters_public_access_disabled import (
aks_clusters_public_access_disabled,
)
check = aks_clusters_public_access_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Public access to nodes is disabled for cluster 'cluster_name' in subscription '{AZURE_SUBSCRIPTION}'"
)
assert result[0].resource_id == cluster_id
assert result[0].resource_name == "cluster_name"
assert result[0].subscription == AZURE_SUBSCRIPTION
def test_aks_cluster_private_fqdn_with_public_ip(self):
aks_client = mock.MagicMock
cluster_id = str(uuid4())
aks_client.clusters = {
AZURE_SUBSCRIPTION: {
cluster_id: Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn="private_fqdn",
network_policy="network_policy",
agent_pool_profiles=[mock.MagicMock(enable_node_public_ip=True)],
rbac_enabled=True,
)
}
}
with mock.patch(
"prowler.providers.azure.services.aks.aks_clusters_public_access_disabled.aks_clusters_public_access_disabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_clusters_public_access_disabled.aks_clusters_public_access_disabled import (
aks_clusters_public_access_disabled,
)
check = aks_clusters_public_access_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Public access to nodes is enabled for cluster 'cluster_name' in subscription '{AZURE_SUBSCRIPTION}'"
)
assert result[0].resource_id == cluster_id
assert result[0].resource_name == "cluster_name"
assert result[0].subscription == AZURE_SUBSCRIPTION

View File

@@ -0,0 +1,111 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.aks.aks_service import Cluster
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_aks_network_policy_enabled:
def test_aks_no_subscriptions(self):
aks_client = mock.MagicMock
aks_client.clusters = {}
with mock.patch(
"prowler.providers.azure.services.aks.aks_network_policy_enabled.aks_network_policy_enabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_network_policy_enabled.aks_network_policy_enabled import (
aks_network_policy_enabled,
)
check = aks_network_policy_enabled()
result = check.execute()
assert len(result) == 0
def test_aks_subscription_empty(self):
aks_client = mock.MagicMock
aks_client.clusters = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.aks.aks_network_policy_enabled.aks_network_policy_enabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_network_policy_enabled.aks_network_policy_enabled import (
aks_network_policy_enabled,
)
check = aks_network_policy_enabled()
result = check.execute()
assert len(result) == 0
def test_aks_network_policy_enabled(self):
aks_client = mock.MagicMock
cluster_id = str(uuid4())
aks_client.clusters = {
AZURE_SUBSCRIPTION: {
cluster_id: Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn=None,
network_policy="network_policy",
agent_pool_profiles=[mock.MagicMock(enable_node_public_ip=False)],
rbac_enabled=True,
)
}
}
with mock.patch(
"prowler.providers.azure.services.aks.aks_network_policy_enabled.aks_network_policy_enabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_network_policy_enabled.aks_network_policy_enabled import (
aks_network_policy_enabled,
)
check = aks_network_policy_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Network policy is enabled for cluster 'cluster_name' in subscription '{AZURE_SUBSCRIPTION}'."
)
assert result[0].resource_name == "cluster_name"
assert result[0].resource_id == cluster_id
assert result[0].subscription == AZURE_SUBSCRIPTION
def test_aks_network_policy_disabled(self):
aks_client = mock.MagicMock
cluster_id = str(uuid4())
aks_client.clusters = {
AZURE_SUBSCRIPTION: {
cluster_id: Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn=None,
network_policy=None,
agent_pool_profiles=[mock.MagicMock(enable_node_public_ip=False)],
rbac_enabled=True,
)
}
}
with mock.patch(
"prowler.providers.azure.services.aks.aks_network_policy_enabled.aks_network_policy_enabled.aks_client",
new=aks_client,
):
from prowler.providers.azure.services.aks.aks_network_policy_enabled.aks_network_policy_enabled import (
aks_network_policy_enabled,
)
check = aks_network_policy_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Network policy is not enabled for cluster 'cluster_name' in subscription '{AZURE_SUBSCRIPTION}'."
)
assert result[0].resource_name == "cluster_name"
assert result[0].resource_id == cluster_id
assert result[0].subscription == AZURE_SUBSCRIPTION

View File

@@ -0,0 +1,60 @@
from unittest.mock import patch
from prowler.providers.azure.services.aks.aks_service import AKS, Cluster
from tests.providers.azure.azure_fixtures import (
AZURE_SUBSCRIPTION,
set_mocked_azure_audit_info,
)
def mock_aks_get_clusters(_):
return {
AZURE_SUBSCRIPTION: {
"cluster_id-1": Cluster(
name="cluster_name",
public_fqdn="public_fqdn",
private_fqdn="private_fqdn",
network_policy="network_policy",
agent_pool_profiles=[],
rbac_enabled=True,
)
}
}
@patch(
"prowler.providers.azure.services.aks.aks_service.AKS.__get_clusters__",
new=mock_aks_get_clusters,
)
class Test_AppInsights_Service:
def test__get_client__(self):
aks = AKS(set_mocked_azure_audit_info())
assert (
aks.clients[AZURE_SUBSCRIPTION].__class__.__name__
== "ContainerServiceClient"
)
def test__get_subscriptions__(self):
aks = AKS(set_mocked_azure_audit_info())
assert aks.subscriptions.__class__.__name__ == "dict"
def test__get_components__(self):
aks = AKS(set_mocked_azure_audit_info())
assert len(aks.clusters) == 1
assert aks.clusters[AZURE_SUBSCRIPTION]["cluster_id-1"].name == "cluster_name"
assert (
aks.clusters[AZURE_SUBSCRIPTION]["cluster_id-1"].public_fqdn
== "public_fqdn"
)
assert (
aks.clusters[AZURE_SUBSCRIPTION]["cluster_id-1"].private_fqdn
== "private_fqdn"
)
assert (
aks.clusters[AZURE_SUBSCRIPTION]["cluster_id-1"].network_policy
== "network_policy"
)
assert (
aks.clusters[AZURE_SUBSCRIPTION]["cluster_id-1"].agent_pool_profiles == []
)
assert aks.clusters[AZURE_SUBSCRIPTION]["cluster_id-1"].rbac_enabled

View File

@@ -0,0 +1,171 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.defender.defender_service import Assesment
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_defender_container_images_resolved_vulnerabilities:
def test_defender_no_subscriptions(self):
defender_client = mock.MagicMock
defender_client.assessments = {}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities import (
defender_container_images_resolved_vulnerabilities,
)
check = defender_container_images_resolved_vulnerabilities()
result = check.execute()
assert len(result) == 0
def test_defender_subscription_empty(self):
defender_client = mock.MagicMock
defender_client.assessments = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities import (
defender_container_images_resolved_vulnerabilities,
)
check = defender_container_images_resolved_vulnerabilities()
result = check.execute()
assert len(result) == 0
def test_defender_subscription_no_assesment(self):
defender_client = mock.MagicMock
defender_client.assessments = {
AZURE_SUBSCRIPTION: {
"": Assesment(
resource_id=str(uuid4()),
resource_name=str(uuid4()),
status="Unhealthy",
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities import (
defender_container_images_resolved_vulnerabilities,
)
check = defender_container_images_resolved_vulnerabilities()
result = check.execute()
assert len(result) == 0
def test_defender_subscription_assesment_unhealthy(self):
defender_client = mock.MagicMock
defender_client.assessments = {
AZURE_SUBSCRIPTION: {
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)": Assesment(
resource_id=str(uuid4()),
resource_name=str(uuid4()),
status="Unhealthy",
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities import (
defender_container_images_resolved_vulnerabilities,
)
check = defender_container_images_resolved_vulnerabilities()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].resource_id
== defender_client.assessments[AZURE_SUBSCRIPTION][
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
].resource_id
)
assert (
result[0].resource_name
== defender_client.assessments[AZURE_SUBSCRIPTION][
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
].resource_name
)
assert result[0].subscription == AZURE_SUBSCRIPTION
assert (
result[0].status_extended
== f"Azure running container images have unresolved vulnerabilities in subscription '{AZURE_SUBSCRIPTION}'."
)
def test_defender_subscription_assesment_healthy(self):
defender_client = mock.MagicMock
defender_client.assessments = {
AZURE_SUBSCRIPTION: {
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)": Assesment(
resource_id=str(uuid4()),
resource_name=str(uuid4()),
status="Healthy",
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities import (
defender_container_images_resolved_vulnerabilities,
)
check = defender_container_images_resolved_vulnerabilities()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].resource_id
== defender_client.assessments[AZURE_SUBSCRIPTION][
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
].resource_id
)
assert (
result[0].resource_name
== defender_client.assessments[AZURE_SUBSCRIPTION][
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
].resource_name
)
assert result[0].subscription == AZURE_SUBSCRIPTION
assert (
result[0].status_extended
== f"Azure running container images do not have unresolved vulnerabilities in subscription '{AZURE_SUBSCRIPTION}'."
)
def test_defender_subscription_assesment_not_applicable(self):
defender_client = mock.MagicMock
defender_client.assessments = {
AZURE_SUBSCRIPTION: {
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)": Assesment(
resource_id=str(uuid4()),
resource_name=str(uuid4()),
status="NotApplicable",
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_resolved_vulnerabilities.defender_container_images_resolved_vulnerabilities import (
defender_container_images_resolved_vulnerabilities,
)
check = defender_container_images_resolved_vulnerabilities()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,175 @@
from datetime import timedelta
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.defender.defender_service import Pricing
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_defender_container_images_scan_enabled:
def test_defender_no_subscriptions(self):
defender_client = mock.MagicMock
defender_client.pricings = {}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled import (
defender_container_images_scan_enabled,
)
check = defender_container_images_scan_enabled()
result = check.execute()
assert len(result) == 0
def test_defender_subscription_empty(self):
defender_client = mock.MagicMock
defender_client.pricings = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled import (
defender_container_images_scan_enabled,
)
check = defender_container_images_scan_enabled()
result = check.execute()
assert len(result) == 0
def test_defender_subscription_no_containers(self):
defender_client = mock.MagicMock
defender_client.pricings = {
AZURE_SUBSCRIPTION: {
"NotContainers": Pricing(
resource_id=str(uuid4()),
pricing_tier="Free",
free_trial_remaining_time=timedelta(days=1),
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled import (
defender_container_images_scan_enabled,
)
check = defender_container_images_scan_enabled()
result = check.execute()
assert len(result) == 0
def test_defender_subscription_containers_no_extensions(self):
defender_client = mock.MagicMock
defender_client.pricings = {
AZURE_SUBSCRIPTION: {
"Containers": Pricing(
resource_id=str(uuid4()),
pricing_tier="Free",
free_trial_remaining_time=timedelta(days=1),
extensions={},
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled import (
defender_container_images_scan_enabled,
)
check = defender_container_images_scan_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
f"Container image scan is disabled in subscription {AZURE_SUBSCRIPTION}."
)
assert (
result[0].resource_id
== defender_client.pricings[AZURE_SUBSCRIPTION][
"Containers"
].resource_id
)
assert result[0].resource_name == "Dender plan for Containers"
assert result[0].subscription == AZURE_SUBSCRIPTION
def test_defender_subscription_containers_container_images_scan_off(self):
defender_client = mock.MagicMock
defender_client.pricings = {
AZURE_SUBSCRIPTION: {
"Containers": Pricing(
resource_id=str(uuid4()),
pricing_tier="Free",
free_trial_remaining_time=timedelta(days=1),
extensions={"ContainerRegistriesVulnerabilityAssessments": False},
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled import (
defender_container_images_scan_enabled,
)
check = defender_container_images_scan_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
f"Container image scan is disabled in subscription {AZURE_SUBSCRIPTION}."
)
assert (
result[0].resource_id
== defender_client.pricings[AZURE_SUBSCRIPTION][
"Containers"
].resource_id
)
assert result[0].resource_name == "Dender plan for Containers"
assert result[0].subscription == AZURE_SUBSCRIPTION
def test_defender_subscription_containers_container_images_scan_on(self):
defender_client = mock.MagicMock
defender_client.pricings = {
AZURE_SUBSCRIPTION: {
"Containers": Pricing(
resource_id=str(uuid4()),
pricing_tier="Free",
free_trial_remaining_time=timedelta(days=1),
extensions={"ContainerRegistriesVulnerabilityAssessments": True},
)
}
}
with mock.patch(
"prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled.defender_client",
new=defender_client,
):
from prowler.providers.azure.services.defender.defender_container_images_scan_enabled.defender_container_images_scan_enabled import (
defender_container_images_scan_enabled,
)
check = defender_container_images_scan_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
f"Container image scan is enabled in subscription {AZURE_SUBSCRIPTION}."
)
assert (
result[0].resource_id
== defender_client.pricings[AZURE_SUBSCRIPTION][
"Containers"
].resource_id
)
assert result[0].resource_name == "Dender plan for Containers"
assert result[0].subscription == AZURE_SUBSCRIPTION

View File

@@ -23,6 +23,7 @@ def mock_defender_get_pricings(_):
resource_id="resource_id", resource_id="resource_id",
pricing_tier="pricing_tier", pricing_tier="pricing_tier",
free_trial_remaining_time=timedelta(days=1), free_trial_remaining_time=timedelta(days=1),
extensions={},
) )
} }
} }
@@ -143,6 +144,7 @@ class Test_Defender_Service:
assert defender.pricings[AZURE_SUBSCRIPTION][ assert defender.pricings[AZURE_SUBSCRIPTION][
"Standard" "Standard"
].free_trial_remaining_time == timedelta(days=1) ].free_trial_remaining_time == timedelta(days=1)
assert defender.pricings[AZURE_SUBSCRIPTION]["Standard"].extensions == {}
def test__get_auto_provisioning_settings__(self): def test__get_auto_provisioning_settings__(self):
defender = Defender(set_mocked_azure_audit_info()) defender = Defender(set_mocked_azure_audit_info())