feat(azure): checks related with MySQL service (#3385)

Co-authored-by: Sergio Garcia <sergargar1@gmail.com>
This commit is contained in:
Rubén De la Torre Vico
2024-02-16 10:40:41 +01:00
committed by GitHub
parent 8e93493d2b
commit af00c5382b
22 changed files with 1239 additions and 3 deletions

22
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -388,6 +388,22 @@ azure-common = ">=1.1,<2.0"
azure-mgmt-core = ">=1.3.2,<2.0.0"
isodate = ">=0.6.1,<1.0.0"
[[package]]
name = "azure-mgmt-rdbms"
version = "10.1.0"
description = "Microsoft Azure RDBMS Management Client Library for Python"
optional = false
python-versions = ">=3.6"
files = [
{file = "azure-mgmt-rdbms-10.1.0.zip", hash = "sha256:a87d401c876c84734cdd4888af551e4a1461b4b328d9816af60cb8ac5979f035"},
{file = "azure_mgmt_rdbms-10.1.0-py3-none-any.whl", hash = "sha256:8eac17d1341a91d7ed914435941ba917b5ef1568acabc3e65653603966a7cc88"},
]
[package.dependencies]
azure-common = ">=1.1,<2.0"
azure-mgmt-core = ">=1.3.0,<2.0.0"
msrest = ">=0.6.21"
[[package]]
name = "azure-mgmt-security"
version = "6.0.0"
@@ -2919,8 +2935,8 @@ astroid = ">=3.0.1,<=3.1.0-dev0"
colorama = {version = ">=0.4.5", markers = "sys_platform == \"win32\""}
dill = [
{version = ">=0.2", markers = "python_version < \"3.11\""},
{version = ">=0.3.6", markers = "python_version >= \"3.11\""},
{version = ">=0.3.7", markers = "python_version >= \"3.12\""},
{version = ">=0.3.6", markers = "python_version >= \"3.11\" and python_version < \"3.12\""},
]
isort = ">=4.2.5,<5.13.0 || >5.13.0,<6"
mccabe = ">=0.6,<0.8"
@@ -4244,4 +4260,4 @@ docs = ["mkdocs", "mkdocs-material"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.9,<3.13"
content-hash = "fd58c1fcca6b5b3352792e5f89b369375574c962ea4624db322de5be4ce2177c"
content-hash = "6b4ca7e2d527bbcbbe0e2686012160658bcf88b25493817993bcbb50c2b4f5bc"

View File

@@ -0,0 +1,4 @@
from prowler.providers.azure.lib.audit_info.audit_info import azure_audit_info
from prowler.providers.azure.services.mysql.mysql_service import MySQL
mysql_client = MySQL(azure_audit_info)

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "mysql_flexible_server_audit_log_connection_activated",
"CheckTitle": "Ensure server parameter 'audit_log_events' has 'CONNECTION' set for MySQL Database Server",
"CheckType": [],
"ServiceName": "mysql",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Microsoft.DBforMySQL/flexibleServers",
"Description": "Set audit_log_enabled to include CONNECTION on MySQL Servers.",
"Risk": "Enabling CONNECTION helps MySQL Database to log items such as successful and failed connection attempts to the server. Log data can be used to identify, troubleshoot, and repair configuration errors and suboptimal performance.",
"RelatedUrl": "https://docs.microsoft.com/en-us/azure/mysql/single-server/how-to-configure-audit-logs-portal",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.tenable.com/audits/items/CIS_Microsoft_Azure_Foundations_v2.0.0_L2.audit:06ec721d4c0ea9169db2b0c6876c5f38",
"Terraform": ""
},
"Recommendation": {
"Text": "1. From Azure Home select the Portal Menu. 2. Select Azure Database for MySQL servers. 3. Select a database. 4. Under Settings, select Server parameters. 5. Update audit_log_enabled parameter to ON. 6. Update audit_log_events parameter to have at least CONNECTION checked. 7. Click Save. 8. Under Monitoring, select Diagnostic settings. 9. Select + Add diagnostic setting. 10. Provide a diagnostic setting name. 11. Under Categories, select MySQL Audit Logs. 12. Specify destination details. 13. Click Save.",
"Url": "https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-logging-threat-detection#lt-3-enable-logging-for-security-investigation"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "There are further costs incurred for storage of logs. For high traffic databases these logs will be significant. Determine your organization's needs before enabling."
}

View File

@@ -0,0 +1,37 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.mysql.mysql_client import mysql_client
class mysql_flexible_server_audit_log_connection_activated(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for (
subscription_name,
servers,
) in mysql_client.flexible_servers.items():
for (
server_name,
server,
) in servers.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report.subscription = subscription_name
report.resource_name = server_name
report.resource_id = server_name
report.status_extended = f"Audit log is disabled for server {server_name} in subscription {subscription_name}."
if "audit_log_events" in server.configurations:
report.resource_id = server.configurations[
"audit_log_events"
].resource_id
if "CONNECTION" in server.configurations[
"audit_log_events"
].value.split(","):
report.status = "PASS"
report.status_extended = f"Audit log is enabled for server {server_name} in subscription {subscription_name}."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "mysql_flexible_server_audit_log_enabled",
"CheckTitle": "Ensure server parameter 'audit_log_enabled' is set to 'ON' for MySQL Database Server",
"CheckType": [],
"ServiceName": "mysql",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Microsoft.DBforMySQL/flexibleServers",
"Description": "Enable audit_log_enabled on MySQL Servers.",
"Risk": "Enabling audit_log_enabled helps MySQL Database to log items such as connection attempts to the server, DDL/DML access, and more. Log data can be used to identify, troubleshoot, and repair configuration errors and suboptimal performance.",
"RelatedUrl": "https://docs.microsoft.com/en-us/azure/mysql/single-server/how-to-configure-audit-logs-portal",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.tenable.com/audits/items/CIS_Microsoft_Azure_Foundations_v1.5.0_L2.audit:c073639a1ce546b535ba73afbf6542aa",
"Terraform": ""
},
"Recommendation": {
"Text": "1. Login to Azure Portal using https://portal.azure.com. 2. Select Azure Database for MySQL Servers. 3. Select a database. 4. Under Settings, select Server parameters. 5. Update audit_log_enabled parameter to ON 6. Under Monitoring, select Diagnostic settings. 7. Select + Add diagnostic setting. 8. Provide a diagnostic setting name. 9. Under Categories, select MySQL Audit Logs. 10. Specify destination details. 11. Click Save.",
"Url": "https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-logging-threat-detection#lt-3-enable-logging-for-security-investigation"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,35 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.mysql.mysql_client import mysql_client
class mysql_flexible_server_audit_log_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for (
subscription_name,
servers,
) in mysql_client.flexible_servers.items():
for (
server_name,
server,
) in servers.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report.subscription = subscription_name
report.resource_name = server_name
report.resource_id = server_name
report.status_extended = f"Audit log is disabled for server {server_name} in subscription {subscription_name}."
if "audit_log_enabled" in server.configurations:
report.resource_id = server.configurations[
"audit_log_enabled"
].resource_id
if server.configurations["audit_log_enabled"].value == "ON":
report.status = "PASS"
report.status_extended = f"Audit log is enabled for server {server_name} in subscription {subscription_name}."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "mysql_flexible_server_minimum_tls_version_12",
"CheckTitle": "Ensure 'TLS Version' is set to 'TLSV1.2' for MySQL flexible Database Server",
"CheckType": [],
"ServiceName": "mysql",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Microsoft.DBforMySQL/flexibleServers",
"Description": "Ensure TLS version on MySQL flexible servers is set to the default value.",
"Risk": "TLS connectivity helps to provide a new layer of security by connecting database server to client applications using Transport Layer Security (TLS). Enforcing TLS connections between database server and client applications helps protect against 'man in the middle' attacks by encrypting the data stream between the server and application.",
"RelatedUrl": "https://docs.microsoft.com/en-us/azure/mysql/concepts-ssl-connection-security",
"Remediation": {
"Code": {
"CLI": "az mysql flexible-server parameter set --name tls_version --resource-group <resourceGroupName> --server-name <serverName> --value TLSV1.2",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/azure/MySQL/mysql-flexible-server-tls-version.html",
"Terraform": "https://docs.bridgecrew.io/docs/ensure-mysql-is-using-the-latest-version-of-tls-encryption#terraform"
},
"Recommendation": {
"Text": "1. Login to Azure Portal using https://portal.azure.com 2. Go to Azure Database for MySQL flexible servers 3. For each database, click on Server parameters under Settings 4. In the search box, type in tls_version 5. Click on the VALUE dropdown, and ensure only TLSV1.2 is selected for tls_version",
"Url": "https://docs.microsoft.com/en-us/azure/mysql/howto-configure-ssl"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,39 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.mysql.mysql_client import mysql_client
class mysql_flexible_server_minimum_tls_version_12(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for (
subscription_name,
servers,
) in mysql_client.flexible_servers.items():
for (
server_name,
server,
) in servers.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report.subscription = subscription_name
report.resource_name = server_name
report.resource_id = server_name
report.status_extended = f"TLS version is not configured in server {server_name} in subscription {subscription_name}."
if "tls_version" in server.configurations:
report.status = "PASS"
report.resource_id = server.configurations[
"tls_version"
].resource_id
report.status_extended = f"TLS version is {server.configurations['tls_version'].value} in server {server_name} in subscription {subscription_name}. This version of TLS is considered secure."
tls_aviable = server.configurations["tls_version"].value.split(",")
if "TLSv1.0" in tls_aviable or "TLSv1.1" in tls_aviable:
report.status = "FAIL"
report.status_extended = f"TLS version is {server.configurations['tls_version'].value} in server {server_name} in subscription {subscription_name}. There is at leat one version of TLS that is considered insecure."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "mysql_flexible_server_ssl_connection_enabled",
"CheckTitle": "Ensure 'Enforce SSL connection' is set to 'Enabled' for Standard MySQL Database Server",
"CheckType": [],
"ServiceName": "mysql",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Microsoft.DBforMySQL/flexibleServers",
"Description": "Enable SSL connection on MYSQL Servers.",
"Risk": "SSL connectivity helps to provide a new layer of security by connecting database server to client applications using Secure Sockets Layer (SSL). Enforcing SSL connections between database server and client applications helps protect against 'man in the middle' attacks by encrypting the data stream between the server and application.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/mysql/single-server/concepts-ssl-connection-security",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.tenable.com/policies/[type]/AC_AZURE_0131",
"Terraform": ""
},
"Recommendation": {
"Text": "1. Login to Azure Portal using https://portal.azure.com 2. Go to Azure Database for MySQL servers 3. For each database, click on Connection security 4. In SSL settings, click on ENABLED to Enforce SSL connections",
"Url": "https://docs.microsoft.com/en-us/azure/mysql/single-server/how-to-configure-ssl"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,34 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.mysql.mysql_client import mysql_client
class mysql_flexible_server_ssl_connection_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for (
subscription_name,
servers,
) in mysql_client.flexible_servers.items():
for (
server_name,
server,
) in servers.items():
report = Check_Report_Azure(self.metadata())
report.status = "FAIL"
report.subscription = subscription_name
report.resource_name = server_name
report.resource_id = server_name
report.status_extended = f"SSL connection is disabled for server {server_name} in subscription {subscription_name}."
if "require_secure_transport" in server.configurations:
report.resource_id = server.configurations[
"require_secure_transport"
].resource_id
if server.configurations["require_secure_transport"].value == "ON":
report.status = "PASS"
report.status_extended = f"SSL connection is enabled for server {server_name} in subscription {subscription_name}."
findings.append(report)
return findings

View File

@@ -0,0 +1,78 @@
from dataclasses import dataclass
from azure.mgmt.rdbms.mysql_flexibleservers import MySQLManagementClient
from prowler.lib.logger import logger
from prowler.providers.azure.lib.service.service import AzureService
########################## MySQL
class MySQL(AzureService):
def __init__(self, audit_info):
super().__init__(MySQLManagementClient, audit_info)
self.flexible_servers = self.__get_flexible_servers__()
def __get_flexible_servers__(self):
logger.info("MySQL - Getting servers...")
servers = {}
for subscription_name, client in self.clients.items():
try:
servers_list = client.servers.list()
servers.update({subscription_name: {}})
for server in servers_list:
servers[subscription_name].update(
{
server.name: FlexibleServer(
resource_id=server.id,
location=server.location,
version=server.version,
configurations=self.__get_configurations__(
client, server.id.split("/")[4], server.name
),
)
}
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return servers
def __get_configurations__(self, client, resource_group, server_name):
logger.info(f"MySQL - Getting configurations from server {server_name} ...")
configurations = {}
try:
configurations_list = client.configurations.list_by_server(
resource_group, server_name
)
for configuration in configurations_list:
configurations.update(
{
configuration.name: Configuration(
resource_id=configuration.id,
description=configuration.description,
value=configuration.value,
)
}
)
except Exception as error:
logger.error(
f"Server name: {server_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return configurations
@dataclass
class Configuration:
resource_id: str
description: str
value: str
@dataclass
class FlexibleServer:
resource_id: str
location: str
version: str
configurations: dict[Configuration]

View File

@@ -30,6 +30,7 @@ awsipranges = "0.3.3"
azure-identity = "1.15.0"
azure-mgmt-applicationinsights = "4.0.0"
azure-mgmt-authorization = "4.0.0"
azure-mgmt-rdbms = "10.1.0"
azure-mgmt-cosmosdb = "9.4.0"
azure-mgmt-security = "6.0.0"
azure-mgmt-sql = "3.0.1"

View File

@@ -0,0 +1,171 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.mysql.mysql_service import (
Configuration,
FlexibleServer,
)
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_mysql_flexible_server_audit_log_connection_activated:
def test_mysql_no_subscriptions(self):
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_connection_activated.mysql_flexible_server_audit_log_connection_activated.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_connection_activated.mysql_flexible_server_audit_log_connection_activated import (
mysql_flexible_server_audit_log_connection_activated,
)
check = mysql_flexible_server_audit_log_connection_activated()
result = check.execute()
assert len(result) == 0
def test_mysql_no_servers(self):
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_connection_activated.mysql_flexible_server_audit_log_connection_activated.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_connection_activated.mysql_flexible_server_audit_log_connection_activated import (
mysql_flexible_server_audit_log_connection_activated,
)
check = mysql_flexible_server_audit_log_connection_activated()
result = check.execute()
assert len(result) == 0
def test_mysql_audit_log_connection_not_connection(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"audit_log_events": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/audit_log_events",
description="description",
value="ADMIN,DDL",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_connection_activated.mysql_flexible_server_audit_log_connection_activated.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_connection_activated.mysql_flexible_server_audit_log_connection_activated import (
mysql_flexible_server_audit_log_connection_activated,
)
check = mysql_flexible_server_audit_log_connection_activated()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/audit_log_events"
)
assert (
result[0].status_extended
== f"Audit log is disabled for server {server_name} in subscription {AZURE_SUBSCRIPTION}."
)
def test_mysql_audit_log_connection_activated(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"audit_log_events": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/audit_log_events",
description="description",
value="CONNECTION",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_connection_activated.mysql_flexible_server_audit_log_connection_activated.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_connection_activated.mysql_flexible_server_audit_log_connection_activated import (
mysql_flexible_server_audit_log_connection_activated,
)
check = mysql_flexible_server_audit_log_connection_activated()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/audit_log_events"
)
assert (
result[0].status_extended
== f"Audit log is enabled for server {server_name} in subscription {AZURE_SUBSCRIPTION}."
)
def test_mysql_audit_log_connection_activated_with_other_options(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"audit_log_events": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/audit_log_events",
description="description",
value="ADMIN,GENERAL,CONNECTION,DDL",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_connection_activated.mysql_flexible_server_audit_log_connection_activated.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_connection_activated.mysql_flexible_server_audit_log_connection_activated import (
mysql_flexible_server_audit_log_connection_activated,
)
check = mysql_flexible_server_audit_log_connection_activated()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/audit_log_events"
)
assert (
result[0].status_extended
== f"Audit log is enabled for server {server_name} in subscription {AZURE_SUBSCRIPTION}."
)

View File

@@ -0,0 +1,128 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.mysql.mysql_service import (
Configuration,
FlexibleServer,
)
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_mysql_flexible_server_audit_log_enabled:
def test_mysql_no_subscriptions(self):
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_enabled.mysql_flexible_server_audit_log_enabled.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_enabled.mysql_flexible_server_audit_log_enabled import (
mysql_flexible_server_audit_log_enabled,
)
check = mysql_flexible_server_audit_log_enabled()
result = check.execute()
assert len(result) == 0
def test_mysql_no_servers(self):
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_enabled.mysql_flexible_server_audit_log_enabled.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_enabled.mysql_flexible_server_audit_log_enabled import (
mysql_flexible_server_audit_log_enabled,
)
check = mysql_flexible_server_audit_log_enabled()
result = check.execute()
assert len(result) == 0
def test_mysql_audit_log_disabled(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"audit_log_enabled": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/audit_log_enabled",
description="description",
value="OFF",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_enabled.mysql_flexible_server_audit_log_enabled.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_enabled.mysql_flexible_server_audit_log_enabled import (
mysql_flexible_server_audit_log_enabled,
)
check = mysql_flexible_server_audit_log_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/audit_log_enabled"
)
assert (
result[0].status_extended
== f"Audit log is disabled for server {server_name} in subscription {AZURE_SUBSCRIPTION}."
)
def test_mysql_audit_log_enabled(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"audit_log_enabled": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/audit_log_enabled",
description="description",
value="ON",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_enabled.mysql_flexible_server_audit_log_enabled.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_audit_log_enabled.mysql_flexible_server_audit_log_enabled import (
mysql_flexible_server_audit_log_enabled,
)
check = mysql_flexible_server_audit_log_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/audit_log_enabled"
)
assert (
result[0].status_extended
== f"Audit log is enabled for server {server_name} in subscription {AZURE_SUBSCRIPTION}."
)

View File

@@ -0,0 +1,243 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.mysql.mysql_service import (
Configuration,
FlexibleServer,
)
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_mysql_flexible_server_minimum_tls_version_12:
def test_mysql_no_subscriptions(self):
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12 import (
mysql_flexible_server_minimum_tls_version_12,
)
check = mysql_flexible_server_minimum_tls_version_12()
result = check.execute()
assert len(result) == 0
def test_mysql_no_servers(self):
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12 import (
mysql_flexible_server_minimum_tls_version_12,
)
check = mysql_flexible_server_minimum_tls_version_12()
result = check.execute()
assert len(result) == 0
def test_mysql_no_tls_configuration(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12 import (
mysql_flexible_server_minimum_tls_version_12,
)
check = mysql_flexible_server_minimum_tls_version_12()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert result[0].resource_id == server_name
assert (
result[0].status_extended
== f"TLS version is not configured in server {server_name} in subscription {AZURE_SUBSCRIPTION}."
)
def test_mysql_flexible_server_minimum_tls_version_12(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"tls_version": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/tls_version",
description="description",
value="TLSv1.2",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12 import (
mysql_flexible_server_minimum_tls_version_12,
)
check = mysql_flexible_server_minimum_tls_version_12()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/tls_version"
)
assert (
result[0].status_extended
== f"TLS version is TLSv1.2 in server {server_name} in subscription {AZURE_SUBSCRIPTION}. This version of TLS is considered secure."
)
def test_mysql_tls_version_is_1_3(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"tls_version": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/tls_version",
description="description",
value="TLSv1.3",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12 import (
mysql_flexible_server_minimum_tls_version_12,
)
check = mysql_flexible_server_minimum_tls_version_12()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/tls_version"
)
assert (
result[0].status_extended
== f"TLS version is TLSv1.3 in server {server_name} in subscription {AZURE_SUBSCRIPTION}. This version of TLS is considered secure."
)
def test_mysql_tls_version_is_not_1_2(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"tls_version": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/tls_version",
description="description",
value="TLSv1.1",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12 import (
mysql_flexible_server_minimum_tls_version_12,
)
check = mysql_flexible_server_minimum_tls_version_12()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/tls_version"
)
assert (
result[0].status_extended
== f"TLS version is TLSv1.1 in server {server_name} in subscription {AZURE_SUBSCRIPTION}. There is at leat one version of TLS that is considered insecure."
)
def test_mysql_tls_version_is_1_1_and_1_3(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"tls_version": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/tls_version",
description="description",
value="TLSv1.1,TLSv1.3",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_minimum_tls_version_12.mysql_flexible_server_minimum_tls_version_12 import (
mysql_flexible_server_minimum_tls_version_12,
)
check = mysql_flexible_server_minimum_tls_version_12()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/tls_version"
)
assert (
result[0].status_extended
== f"TLS version is TLSv1.1,TLSv1.3 in server {server_name} in subscription {AZURE_SUBSCRIPTION}. There is at leat one version of TLS that is considered insecure."
)

View File

@@ -0,0 +1,229 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.mysql.mysql_service import (
Configuration,
FlexibleServer,
)
from tests.providers.azure.azure_fixtures import AZURE_SUBSCRIPTION
class Test_mysql_flexible_server_ssl_connection_enabled:
def test_mysql_no_subscriptions(self):
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled import (
mysql_flexible_server_ssl_connection_enabled,
)
check = mysql_flexible_server_ssl_connection_enabled()
result = check.execute()
assert len(result) == 0
def test_mysql_no_servers(self):
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {AZURE_SUBSCRIPTION: {}}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled import (
mysql_flexible_server_ssl_connection_enabled,
)
check = mysql_flexible_server_ssl_connection_enabled()
result = check.execute()
assert len(result) == 0
def test_mysql_connection_enabled(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"require_secure_transport": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/require_secure_transport",
description="description",
value="ON",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled import (
mysql_flexible_server_ssl_connection_enabled,
)
check = mysql_flexible_server_ssl_connection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/require_secure_transport"
)
assert (
result[0].status_extended
== f"SSL connection is enabled for server {server_name} in subscription {AZURE_SUBSCRIPTION}."
)
def test_mysql_ssl_connection_disabled(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"require_secure_transport": Configuration(
resource_id=f"/subscriptions/{server_name}/configurations/require_secure_transport",
description="description",
value="OFF",
)
},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled import (
mysql_flexible_server_ssl_connection_enabled,
)
check = mysql_flexible_server_ssl_connection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert (
result[0].resource_id
== f"/subscriptions/{server_name}/configurations/require_secure_transport"
)
assert (
result[0].status_extended
== f"SSL connection is disabled for server {server_name} in subscription {AZURE_SUBSCRIPTION}."
)
def test_mysql_ssl_connection_no_configuration(self):
server_name = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={},
)
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled import (
mysql_flexible_server_ssl_connection_enabled,
)
check = mysql_flexible_server_ssl_connection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name
assert result[0].resource_id == server_name
assert (
result[0].status_extended
== f"SSL connection is disabled for server {server_name} in subscription {AZURE_SUBSCRIPTION}."
)
def test_mysql_ssl_connection_enabled_and_disabled(self):
server_name_1 = str(uuid4())
server_name_2 = str(uuid4())
mysql_client = mock.MagicMock
mysql_client.flexible_servers = {
AZURE_SUBSCRIPTION: {
server_name_1: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"require_secure_transport": Configuration(
resource_id=f"/subscriptions/{server_name_1}/configurations/require_secure_transport",
description="description",
value="ON",
)
},
),
server_name_2: FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"require_secure_transport": Configuration(
resource_id=f"/subscriptions/{server_name_2}/configurations/require_secure_transport",
description="description",
value="OFF",
)
},
),
}
}
with mock.patch(
"prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled.mysql_client",
new=mysql_client,
):
from prowler.providers.azure.services.mysql.mysql_flexible_server_ssl_connection_enabled.mysql_flexible_server_ssl_connection_enabled import (
mysql_flexible_server_ssl_connection_enabled,
)
check = mysql_flexible_server_ssl_connection_enabled()
result = check.execute()
assert len(result) == 2
assert result[0].status == "PASS"
assert result[0].subscription == AZURE_SUBSCRIPTION
assert result[0].resource_name == server_name_1
assert (
result[0].resource_id
== f"/subscriptions/{server_name_1}/configurations/require_secure_transport"
)
assert (
result[0].status_extended
== f"SSL connection is enabled for server {server_name_1} in subscription {AZURE_SUBSCRIPTION}."
)
assert result[1].status == "FAIL"
assert result[1].subscription == AZURE_SUBSCRIPTION
assert result[1].resource_name == server_name_2
assert (
result[1].resource_id
== f"/subscriptions/{server_name_2}/configurations/require_secure_transport"
)
assert (
result[1].status_extended
== f"SSL connection is disabled for server {server_name_2} in subscription {AZURE_SUBSCRIPTION}."
)

View File

@@ -0,0 +1,101 @@
from unittest.mock import patch
from prowler.providers.azure.services.mysql.mysql_service import (
Configuration,
FlexibleServer,
MySQL,
)
from tests.providers.azure.azure_fixtures import (
AZURE_SUBSCRIPTION,
set_mocked_azure_audit_info,
)
def mock_mysql_get_servers(_):
return {
AZURE_SUBSCRIPTION: {
"test": FlexibleServer(
resource_id="/subscriptions/resource_id",
location="location",
version="version",
configurations={
"test": Configuration(
resource_id="/subscriptions/test/resource_id",
description="description",
value="value",
)
},
)
}
}
def mock_mysql_get_configurations(_):
return {
"test": Configuration(
resource_id="/subscriptions/resource_id",
description="description",
value="value",
)
}
@patch(
"prowler.providers.azure.services.mysql.mysql_service.MySQL.__get_flexible_servers__",
new=mock_mysql_get_servers,
)
@patch(
"prowler.providers.azure.services.mysql.mysql_service.MySQL.__get_configurations__",
new=mock_mysql_get_configurations,
)
class Test_MySQL_Service:
def test__get_client__(self):
mysql = MySQL(set_mocked_azure_audit_info())
assert (
mysql.clients[AZURE_SUBSCRIPTION].__class__.__name__
== "MySQLManagementClient"
)
def test__get_subscriptions__(self):
mysql = MySQL(set_mocked_azure_audit_info())
assert mysql.subscriptions.__class__.__name__ == "dict"
def test__get_flexible_servers__(self):
mysql = MySQL(set_mocked_azure_audit_info())
assert len(mysql.flexible_servers) == 1
assert (
mysql.flexible_servers[AZURE_SUBSCRIPTION]["test"].resource_id
== "/subscriptions/resource_id"
)
assert mysql.flexible_servers[AZURE_SUBSCRIPTION]["test"].location == "location"
assert mysql.flexible_servers[AZURE_SUBSCRIPTION]["test"].version == "version"
assert (
len(mysql.flexible_servers[AZURE_SUBSCRIPTION]["test"].configurations) == 1
)
assert (
mysql.flexible_servers[AZURE_SUBSCRIPTION]["test"]
.configurations["test"]
.resource_id
== "/subscriptions/test/resource_id"
)
assert (
mysql.flexible_servers[AZURE_SUBSCRIPTION]["test"]
.configurations["test"]
.description
== "description"
)
assert (
mysql.flexible_servers[AZURE_SUBSCRIPTION]["test"]
.configurations["test"]
.value
== "value"
)
def test__get_configurations__(self):
mysql = MySQL(set_mocked_azure_audit_info())
configurations = mysql.__get_configurations__()
assert len(configurations) == 1
assert configurations["test"].resource_id == "/subscriptions/resource_id"
assert configurations["test"].description == "description"
assert configurations["test"].value == "value"