feat(azure): New Azure checks related to CosmosDB (#3386)

Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
This commit is contained in:
Pedro Martín
2024-02-13 13:53:36 +01:00
committed by GitHub
parent 4740a7b930
commit 355f589e5a
18 changed files with 686 additions and 1 deletions

18
poetry.lock generated
View File

@@ -372,6 +372,22 @@ files = [
[package.dependencies]
azure-core = ">=1.26.2,<2.0.0"
[[package]]
name = "azure-mgmt-cosmosdb"
version = "9.4.0"
description = "Microsoft Azure Cosmos DB Management Client Library for Python"
optional = false
python-versions = ">=3.7"
files = [
{file = "azure-mgmt-cosmosdb-9.4.0.tar.gz", hash = "sha256:cabb821cd446b09e73d24c31c287a60fcc3623488f1ffa9335c692267e79c341"},
{file = "azure_mgmt_cosmosdb-9.4.0-py3-none-any.whl", hash = "sha256:8ce9ab58df018980c4cf8defb38022fa5f2a9dcbccdeb73e952374cbaff919c5"},
]
[package.dependencies]
azure-common = ">=1.1,<2.0"
azure-mgmt-core = ">=1.3.2,<2.0.0"
isodate = ">=0.6.1,<1.0.0"
[[package]]
name = "azure-mgmt-security"
version = "6.0.0"
@@ -4221,4 +4237,4 @@ docs = ["mkdocs", "mkdocs-material"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.9,<3.13"
content-hash = "e07cca1a69753a598ad37052f19c8c839d52d6bffa7bdd9ee86e088a49870d61"
content-hash = "9db98b41290bacf3b089ef0ca385fd5a2f9b040d1b12de4161c27e5f0152d25b"

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "cosmosdb_account_firewall_use_selected_networks",
"CheckTitle": "Ensure That 'Firewalls & Networks' Is Limited to Use Selected Networks Instead of All Networks",
"CheckType": [],
"ServiceName": "cosmosdb",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "CosmosDB",
"Description": "Limiting your Cosmos DB to only communicate on whitelisted networks lowers its attack footprint.",
"Risk": "Selecting certain networks for your Cosmos DB to communicate restricts the number of networks including the internet that can interact with what is stored within the database.",
"RelatedUrl": "https://docs.microsoft.com/en-us/azure/cosmos-db/how-to-configure-private-endpoints",
"Remediation": {
"Code": {
"CLI": "az cosmosdb database list / az cosmosdb show <database id> **isVirtualNetworkFilterEnabled should be set to true**",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "1. Open the portal menu. 2. Select the Azure Cosmos DB blade. 3. Select a Cosmos DB account to audit. 4. Select Networking. 5. Under Public network access, select Selected networks. 6. Under Virtual networks, select + Add existing virtual network or + Add a new virtual network. 7. For existing networks, select subscription, virtual network, subnet and click Add. For new networks, provide a name, update the default values if required, and click Create. 8. Click Save.",
"Url": "https://learn.microsoft.com/en-us/azure/storage/common/storage-network-security?tabs=azure-portal"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Failure to whitelist the correct networks will result in a connection loss."
}

View File

@@ -0,0 +1,21 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.cosmosdb.cosmosdb_client import cosmosdb_client
class cosmosdb_account_firewall_use_selected_networks(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
for account in accounts:
report = Check_Report_Azure(self.metadata())
report.subscription = subscription
report.resource_name = account.name
report.resource_id = account.id
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} has firewall rules that allow access from all networks."
if account.is_virtual_network_filter_enabled:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} has firewall rules that allow access only from selected networks."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "cosmosdb_account_use_aad_and_rbac",
"CheckTitle": "Use Azure Active Directory (AAD) Client Authentication and Azure RBAC where possible.",
"CheckType": [],
"ServiceName": "cosmosdb",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "CosmosDB",
"Description": "Cosmos DB can use tokens or AAD for client authentication which in turn will use Azure RBAC for authorization. Using AAD is significantly more secure because AAD handles the credentials and allows for MFA and centralized management, and the Azure RBAC better integrated with the rest of Azure.",
"Risk": "AAD client authentication is considerably more secure than token-based authentication because the tokens must be persistent at the client. AAD does not require this.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/cosmos-db/role-based-access-control",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Map all the resources that currently access to the Azure Cosmos DB account with keys or access tokens. Create an Azure Active Directory (AAD) identity for each of these resources: For Azure resources, you can create a managed identity . You may choose between system-assigned and user-assigned managed identities. For non-Azure resources, create an AAD identity. Grant each AAD identity the minimum permission it requires. When possible, we recommend you use one of the 2 built-in role definitions: Cosmos DB Built-in Data Reader or Cosmos DB Built-in Data Contributor. Validate that the new resource is functioning correctly. After new permissions are granted to identities, it may take a few hours until they propagate. When all resources are working correctly with the new identities, continue to the next step. You can use the az resource update powershell command: $cosmosdbname = 'cosmos-db-account-name' $resourcegroup = 'resource-group-name' $cosmosdb = az cosmosdb show --name $cosmosdbname --resource-group $resourcegroup | ConvertFrom-Json az resource update --ids $cosmosdb.id --set properties.disableLocalAuth=true --latest- include-preview",
"Url": "https://learn.microsoft.com/en-us/azure/cosmos-db/role-based-access-control"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,21 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.cosmosdb.cosmosdb_client import cosmosdb_client
class cosmosdb_account_use_aad_and_rbac(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
for account in accounts:
report = Check_Report_Azure(self.metadata())
report.subscription = subscription
report.resource_name = account.name
report.resource_id = account.id
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is not using AAD and RBAC"
if account.disable_local_auth:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is using AAD and RBAC"
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "cosmosdb_account_use_private_endpoints",
"CheckTitle": "Ensure That Private Endpoints Are Used Where Possible",
"CheckType": [],
"ServiceName": "cosmosdb",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "CosmosDB",
"Description": "Private endpoints limit network traffic to approved sources.",
"Risk": "For sensitive data, private endpoints allow granular control of which services can communicate with Cosmos DB and ensure that this network traffic is private. You set this up on a case by case basis for each service you wish to be connected.",
"RelatedUrl": "https://docs.microsoft.com/en-us/azure/cosmos-db/how-to-configure-private-endpoints",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "1. Open the portal menu. 2. Select the Azure Cosmos DB blade. 3. Select the Azure Cosmos DB account. 4. Select Networking. 5. Select Private access. 6. Click + Private Endpoint. 7. Provide a Name. 8. Click Next. 9. From the Resource type drop down, select Microsoft.AzureCosmosDB/databaseAccounts. 10. From the Resource drop down, select the Cosmos DB account. 11. Click Next. 12. Provide appropriate Virtual Network details. 13. Click Next. 14. Provide appropriate DNS details. 15. Click Next. 16. Optionally provide Tags. 17. Click Next : Review + create. 18. Click Create.",
"Url": "https://docs.microsoft.com/en-us/azure/private-link/tutorial-private-endpoint-cosmosdb-portal"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Only whitelisted services will have access to communicate with the Cosmos DB."
}

View File

@@ -0,0 +1,21 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.cosmosdb.cosmosdb_client import cosmosdb_client
class cosmosdb_account_use_private_endpoints(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
for account in accounts:
report = Check_Report_Azure(self.metadata())
report.subscription = subscription
report.resource_name = account.name
report.resource_id = account.id
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is not using private endpoints connections"
if account.private_endpoint_connections:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is using private endpoints connections"
findings.append(report)
return findings

View File

@@ -0,0 +1,4 @@
from prowler.providers.azure.lib.audit_info.audit_info import azure_audit_info
from prowler.providers.azure.services.cosmosdb.cosmosdb_service import CosmosDB
cosmosdb_client = CosmosDB(azure_audit_info)

View File

@@ -0,0 +1,53 @@
from dataclasses import dataclass
from azure.mgmt.cosmosdb import CosmosDBManagementClient
from azure.mgmt.cosmosdb.models import PrivateEndpointConnection
from prowler.lib.logger import logger
from prowler.providers.azure.lib.service.service import AzureService
class CosmosDB(AzureService):
def __init__(self, audit_info):
super().__init__(CosmosDBManagementClient, audit_info)
self.accounts = self.__get_accounts__()
def __get_accounts__(self):
logger.info("CosmosDB - Getting accounts...")
accounts = {}
for subscription, client in self.clients.items():
try:
accounts_list = client.database_accounts.list()
accounts.update({subscription: []})
for account in accounts_list:
accounts[subscription].append(
Account(
id=account.id,
name=account.name,
kind=account.kind,
location=account.location,
type=account.type,
tags=account.tags,
is_virtual_network_filter_enabled=account.is_virtual_network_filter_enabled,
private_endpoint_connections=account.private_endpoint_connections,
disable_local_auth=account.disable_local_auth,
)
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return accounts
@dataclass
class Account:
id: str
name: str
kind: str
location: str
type: str
tags: dict
is_virtual_network_filter_enabled: bool
private_endpoint_connections: list[PrivateEndpointConnection] = None
disable_local_auth: bool = False

View File

@@ -30,6 +30,7 @@ awsipranges = "0.3.3"
azure-identity = "1.15.0"
azure-mgmt-applicationinsights = "4.0.0"
azure-mgmt-authorization = "4.0.0"
azure-mgmt-cosmosdb = "9.4.0"
azure-mgmt-security = "6.0.0"
azure-mgmt-sql = "3.0.1"
azure-mgmt-storage = "21.1.0"

View File

@@ -171,6 +171,42 @@ expected_packages = [
name="prowler.providers.azure.services.sqlserver.sqlserver_va_emails_notifications_admins_enabled.sqlserver_va_emails_notifications_admins_enabled",
ispkg=False,
),
ModuleInfo(
module_finder=FileFinder("/root_dir/prowler/providers/azure/services/cosmosdb"),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_firewall_use_selected_networks",
ispkg=True,
),
ModuleInfo(
module_finder=FileFinder(
"/root_dir/prowler/providers/azure/services/cosmosdb/cosmosdb_account_firewall_use_selected_networks"
),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_firewall_use_selected_networks.cosmosdb_account_firewall_use_selected_networks",
ispkg=False,
),
ModuleInfo(
module_finder=FileFinder("/root_dir/prowler/providers/azure/services/cosmosdb"),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_private_endpoints",
ispkg=True,
),
ModuleInfo(
module_finder=FileFinder(
"/root_dir/prowler/providers/azure/services/cosmosdb/cosmosdb_account_use_private_endpoints"
),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_private_endpoints.cosmosdb_account_use_private_endpoints",
ispkg=False,
),
ModuleInfo(
module_finder=FileFinder("/root_dir/prowler/providers/azure/services/cosmosdb"),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_aad_and_rbac",
ispkg=True,
),
ModuleInfo(
module_finder=FileFinder(
"/root_dir/prowler/providers/azure/services/cosmosdb/cosmosdb_account_use_aad_and_rbac"
),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_aad_and_rbac.cosmosdb_account_use_aad_and_rbac",
ispkg=False,
),
]
@@ -320,6 +356,48 @@ def mock_list_modules(*_):
name="prowler.providers.azure.services.sqlserver.sqlserver_va_emails_notifications_admins_enabled.sqlserver_va_emails_notifications_admins_enabled",
ispkg=False,
),
ModuleInfo(
module_finder=FileFinder(
"/root_dir/prowler/providers/azure/services/cosmosdb"
),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_firewall_use_selected_networks",
ispkg=True,
),
ModuleInfo(
module_finder=FileFinder(
"/root_dir/prowler/providers/azure/services/cosmosdb/cosmosdb_account_firewall_use_selected_networks"
),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_firewall_use_selected_networks.cosmosdb_account_firewall_use_selected_networks",
ispkg=False,
),
ModuleInfo(
module_finder=FileFinder(
"/root_dir/prowler/providers/azure/services/cosmosdb"
),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_private_endpoints",
ispkg=True,
),
ModuleInfo(
module_finder=FileFinder(
"/root_dir/prowler/providers/azure/services/cosmosdb/cosmosdb_account_use_private_endpoints"
),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_private_endpoints.cosmosdb_account_use_private_endpoints",
ispkg=False,
),
ModuleInfo(
module_finder=FileFinder(
"/root_dir/prowler/providers/azure/services/cosmosdb"
),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_aad_and_rbac",
ispkg=True,
),
ModuleInfo(
module_finder=FileFinder(
"/root_dir/prowler/providers/azure/services/cosmosdb/cosmosdb_account_use_aad_and_rbac"
),
name="prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_aad_and_rbac.cosmosdb_account_use_aad_and_rbac",
ispkg=False,
),
]
return modules
@@ -729,6 +807,18 @@ class Test_Check:
"sqlserver_va_emails_notifications_admins_enabled",
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_va_emails_notifications_admins_enabled",
),
(
"cosmosdb_account_firewall_use_selected_networks",
"/root_dir/prowler/providers/azure/services/cosmosdb/cosmosdb_account_firewall_use_selected_networks",
),
(
"cosmosdb_account_use_private_endpoints",
"/root_dir/prowler/providers/azure/services/cosmosdb/cosmosdb_account_use_private_endpoints",
),
(
"cosmosdb_account_use_aad_and_rbac",
"/root_dir/prowler/providers/azure/services/cosmosdb/cosmosdb_account_use_aad_and_rbac",
),
]
returned_checks = recover_checks_from_provider(provider, service)
assert returned_checks == expected_checks

View File

@@ -0,0 +1,102 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.cosmosdb.cosmosdb_service import Account
AZURE_SUBSCRIPTION = str(uuid4())
class Test_cosmosdb_account_firewall_use_selected_networks:
def test_no_accounts(self):
cosmosdb_client = mock.MagicMock
cosmosdb_client.accounts = {}
with mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_firewall_use_selected_networks.cosmosdb_account_firewall_use_selected_networks.cosmosdb_client",
new=cosmosdb_client,
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_firewall_use_selected_networks.cosmosdb_account_firewall_use_selected_networks import (
cosmosdb_account_firewall_use_selected_networks,
)
check = cosmosdb_account_firewall_use_selected_networks()
result = check.execute()
assert len(result) == 0
def test_accounts_no_virtual_network_filter_enabled(self):
cosmosdb_client = mock.MagicMock
account_name = "Account Name"
account_id = str(uuid4())
cosmosdb_client.accounts = {
AZURE_SUBSCRIPTION: [
Account(
id=account_id,
name=account_name,
kind=None,
location=None,
type=None,
tags=None,
disable_local_auth=None,
is_virtual_network_filter_enabled=False,
)
]
}
with mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_firewall_use_selected_networks.cosmosdb_account_firewall_use_selected_networks.cosmosdb_client",
new=cosmosdb_client,
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_firewall_use_selected_networks.cosmosdb_account_firewall_use_selected_networks import (
cosmosdb_account_firewall_use_selected_networks,
)
check = cosmosdb_account_firewall_use_selected_networks()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CosmosDB account {account_name} from subscription {AZURE_SUBSCRIPTION} has firewall rules that allow access from all networks."
)
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == account_name
assert result[0].resource_id == account_id
def test_accounts_virtual_network_filter_enabled(self):
cosmosdb_client = mock.MagicMock
account_name = "Account Name"
account_id = str(uuid4())
cosmosdb_client.accounts = {
AZURE_SUBSCRIPTION: [
Account(
id=account_id,
name=account_name,
kind=None,
location=None,
type=None,
tags=None,
disable_local_auth=None,
is_virtual_network_filter_enabled=True,
)
]
}
with mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_firewall_use_selected_networks.cosmosdb_account_firewall_use_selected_networks.cosmosdb_client",
new=cosmosdb_client,
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_firewall_use_selected_networks.cosmosdb_account_firewall_use_selected_networks import (
cosmosdb_account_firewall_use_selected_networks,
)
check = cosmosdb_account_firewall_use_selected_networks()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CosmosDB account {account_name} from subscription {AZURE_SUBSCRIPTION} has firewall rules that allow access only from selected networks."
)
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == account_name
assert result[0].resource_id == account_id

View File

@@ -0,0 +1,104 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.cosmosdb.cosmosdb_service import Account
AZURE_SUBSCRIPTION = str(uuid4())
class Test_cosmosdb_account_use_aad_and_rbac:
def test_no_accounts(self):
cosmosdb_client = mock.MagicMock
cosmosdb_client.accounts = {}
with mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_aad_and_rbac.cosmosdb_account_use_aad_and_rbac.cosmosdb_client",
new=cosmosdb_client,
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_aad_and_rbac.cosmosdb_account_use_aad_and_rbac import (
cosmosdb_account_use_aad_and_rbac,
)
check = cosmosdb_account_use_aad_and_rbac()
result = check.execute()
assert len(result) == 0
def test_accounts_disable_local_auth_false(self):
cosmosdb_client = mock.MagicMock
account_name = "Account Name"
account_id = str(uuid4())
cosmosdb_client.accounts = {
AZURE_SUBSCRIPTION: [
Account(
id=account_id,
name=account_name,
kind=None,
location=None,
type=None,
tags=None,
is_virtual_network_filter_enabled=None,
private_endpoint_connections=None,
disable_local_auth=False,
)
]
}
with mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_aad_and_rbac.cosmosdb_account_use_aad_and_rbac.cosmosdb_client",
new=cosmosdb_client,
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_aad_and_rbac.cosmosdb_account_use_aad_and_rbac import (
cosmosdb_account_use_aad_and_rbac,
)
check = cosmosdb_account_use_aad_and_rbac()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CosmosDB account {account_name} from subscription {AZURE_SUBSCRIPTION} is not using AAD and RBAC"
)
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == account_name
assert result[0].resource_id == account_id
def test_accounts_disable_local_auth_true(self):
cosmosdb_client = mock.MagicMock
account_name = "Account Name"
account_id = str(uuid4())
cosmosdb_client.accounts = {
AZURE_SUBSCRIPTION: [
Account(
id=account_id,
name=account_name,
kind=None,
location=None,
type=None,
tags=None,
is_virtual_network_filter_enabled=None,
private_endpoint_connections=None,
disable_local_auth=True,
)
]
}
with mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_aad_and_rbac.cosmosdb_account_use_aad_and_rbac.cosmosdb_client",
new=cosmosdb_client,
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_aad_and_rbac.cosmosdb_account_use_aad_and_rbac import (
cosmosdb_account_use_aad_and_rbac,
)
check = cosmosdb_account_use_aad_and_rbac()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CosmosDB account {account_name} from subscription {AZURE_SUBSCRIPTION} is using AAD and RBAC"
)
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == account_name
assert result[0].resource_id == account_id

View File

@@ -0,0 +1,110 @@
from unittest import mock
from uuid import uuid4
from azure.mgmt.cosmosdb.models import PrivateEndpointConnection
from prowler.providers.azure.services.cosmosdb.cosmosdb_service import Account
AZURE_SUBSCRIPTION = str(uuid4())
class Test_cosmosdb_account_use_private_endpoints:
def test_no_accounts(self):
cosmosdb_client = mock.MagicMock
cosmosdb_client.accounts = {}
with mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_private_endpoints.cosmosdb_account_use_private_endpoints.cosmosdb_client",
new=cosmosdb_client,
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_private_endpoints.cosmosdb_account_use_private_endpoints import (
cosmosdb_account_use_private_endpoints,
)
check = cosmosdb_account_use_private_endpoints()
result = check.execute()
assert len(result) == 0
def test_accounts_no_private_endpoints_connections(self):
cosmosdb_client = mock.MagicMock
account_name = "Account Name"
account_id = str(uuid4())
cosmosdb_client.accounts = {
AZURE_SUBSCRIPTION: [
Account(
id=account_id,
name=account_name,
kind=None,
location=None,
type=None,
tags=None,
is_virtual_network_filter_enabled=None,
private_endpoint_connections=None,
disable_local_auth=None,
)
]
}
with mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_private_endpoints.cosmosdb_account_use_private_endpoints.cosmosdb_client",
new=cosmosdb_client,
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_private_endpoints.cosmosdb_account_use_private_endpoints import (
cosmosdb_account_use_private_endpoints,
)
check = cosmosdb_account_use_private_endpoints()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CosmosDB account {account_name} from subscription {AZURE_SUBSCRIPTION} is not using private endpoints connections"
)
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == account_name
assert result[0].resource_id == account_id
def test_accounts_private_endpoints_connections(self):
cosmosdb_client = mock.MagicMock
account_name = "Account Name"
account_id = str(uuid4())
cosmosdb_client.accounts = {
AZURE_SUBSCRIPTION: [
Account(
id=account_id,
name=account_name,
kind=None,
location=None,
type=None,
tags=None,
is_virtual_network_filter_enabled=None,
private_endpoint_connections=[
PrivateEndpointConnection(
id="private_endpoint", name="private_name"
)
],
disable_local_auth=None,
)
]
}
with mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_private_endpoints.cosmosdb_account_use_private_endpoints.cosmosdb_client",
new=cosmosdb_client,
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_use_private_endpoints.cosmosdb_account_use_private_endpoints import (
cosmosdb_account_use_private_endpoints,
)
check = cosmosdb_account_use_private_endpoints()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CosmosDB account {account_name} from subscription {AZURE_SUBSCRIPTION} is using private endpoints connections"
)
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == account_name
assert result[0].resource_id == account_id

View File

@@ -0,0 +1,52 @@
from unittest.mock import patch
from prowler.providers.azure.services.cosmosdb.cosmosdb_service import Account, CosmosDB
from tests.providers.azure.azure_fixtures import (
AZURE_SUBSCRIPTION,
set_mocked_azure_audit_info,
)
def mock_cosmosdb_get_accounts(_):
return {
AZURE_SUBSCRIPTION: [
Account(
id="account_id",
name="account_name",
kind=None,
location=None,
type=None,
tags=None,
is_virtual_network_filter_enabled=None,
disable_local_auth=None,
)
]
}
@patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_service.CosmosDB.__get_accounts__",
new=mock_cosmosdb_get_accounts,
)
class Test_CosmosDB_Service:
def test__get_client__(self):
account = CosmosDB(set_mocked_azure_audit_info())
assert (
account.clients[AZURE_SUBSCRIPTION].__class__.__name__
== "CosmosDBManagementClient"
)
def test__get_accounts__(self):
account = CosmosDB(set_mocked_azure_audit_info())
assert account.accounts[AZURE_SUBSCRIPTION][0].__class__.__name__ == "Account"
assert account.accounts[AZURE_SUBSCRIPTION][0].id == "account_id"
assert account.accounts[AZURE_SUBSCRIPTION][0].name == "account_name"
assert account.accounts[AZURE_SUBSCRIPTION][0].kind is None
assert account.accounts[AZURE_SUBSCRIPTION][0].location is None
assert account.accounts[AZURE_SUBSCRIPTION][0].type is None
assert account.accounts[AZURE_SUBSCRIPTION][0].tags is None
assert (
account.accounts[AZURE_SUBSCRIPTION][0].is_virtual_network_filter_enabled
is None
)
assert account.accounts[AZURE_SUBSCRIPTION][0].disable_local_auth is None