fix(rds): check configurations for DB instances at cluster level (#2277)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2023-04-26 13:51:07 +02:00
committed by GitHub
parent 6e84f517a9
commit ac6272e739
6 changed files with 374 additions and 19 deletions

View File

@@ -98,7 +98,7 @@ class Test_rds_instance_deletion_protection:
assert result[0].resource_id == "db-master-1"
@mock_rds
def test_rds_instance_with_encryption(self):
def test_rds_instance_with_deletion_protection(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
@@ -136,3 +136,107 @@ class Test_rds_instance_deletion_protection:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
@mock_rds
def test_rds_instance_without_cluster_deletion_protection(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_cluster(
DBClusterIdentifier="db-cluster-1",
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
DeletionProtection=False,
MasterUsername="test",
MasterUserPassword="password",
Tags=[
{"Key": "test", "Value": "test"},
],
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
DBClusterIdentifier="db-cluster-1",
)
from prowler.providers.aws.services.rds.rds_service import RDS
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection.rds_client",
new=RDS(audit_info),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection import (
rds_instance_deletion_protection,
)
check = rds_instance_deletion_protection()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"deletion protection is not enabled at cluster",
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
@mock_rds
def test_rds_instance_with_cluster_deletion_protection(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_cluster(
DBClusterIdentifier="db-cluster-1",
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
DeletionProtection=True,
MasterUsername="test",
MasterUserPassword="password",
Tags=[
{"Key": "test", "Value": "test"},
],
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
DBClusterIdentifier="db-cluster-1",
)
from prowler.providers.aws.services.rds.rds_service import RDS
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection.rds_client",
new=RDS(audit_info),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_deletion_protection.rds_instance_deletion_protection import (
rds_instance_deletion_protection,
)
check = rds_instance_deletion_protection()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"deletion protection is enabled at cluster",
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"

View File

@@ -5,6 +5,7 @@ from boto3 import client, session
from moto import mock_rds
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.rds.rds_service import DBCluster, DBInstance
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
@@ -136,3 +137,123 @@ class Test_rds_instance_multi_az:
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
def test_rds_instance_in_cluster_multi_az(self):
rds_client = mock.MagicMock
rds_client.db_clusters = {
"test-cluster": DBCluster(
id="test-cluster",
endpoint="",
engine="aurora",
status="available",
public=False,
encrypted=False,
auto_minor_version_upgrade=False,
backup_retention_period=0,
cloudwatch_logs=[],
deletion_protection=False,
parameter_group="",
multi_az=True,
region=AWS_REGION,
tags=[],
)
}
rds_client.db_instances = [
DBInstance(
id="test-instance",
endpoint="",
engine="aurora",
status="available",
public=False,
encrypted=False,
auto_minor_version_upgrade=False,
backup_retention_period=0,
cloudwatch_logs=[],
deletion_protection=False,
parameter_group=[],
multi_az=False,
cluster_id="test-cluster",
region=AWS_REGION,
tags=[],
)
]
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client",
new=rds_client,
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import (
rds_instance_multi_az,
)
check = rds_instance_multi_az()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has multi-AZ enabled at cluster",
result[0].status_extended,
)
assert result[0].resource_id == "test-instance"
def test_rds_instance_in_cluster_without_multi_az(self):
rds_client = mock.MagicMock
rds_client.db_clusters = {
"test-cluster": DBCluster(
id="test-cluster",
endpoint="",
engine="aurora",
status="available",
public=False,
encrypted=False,
auto_minor_version_upgrade=False,
backup_retention_period=0,
cloudwatch_logs=[],
deletion_protection=False,
parameter_group="",
multi_az=False,
region=AWS_REGION,
tags=[],
)
}
rds_client.db_instances = [
DBInstance(
id="test-instance",
endpoint="",
engine="aurora",
status="available",
public=False,
encrypted=False,
auto_minor_version_upgrade=False,
backup_retention_period=0,
cloudwatch_logs=[],
deletion_protection=False,
parameter_group=[],
multi_az=False,
cluster_id="test-cluster",
region=AWS_REGION,
tags=[],
)
]
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az.rds_client",
new=rds_client,
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_multi_az.rds_instance_multi_az import (
rds_instance_multi_az,
)
check = rds_instance_multi_az()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have multi-AZ enabled at cluster",
result[0].status_extended,
)
assert result[0].resource_id == "test-instance"

View File

@@ -176,6 +176,55 @@ class Test_RDS_Service:
assert rds.db_snapshots[0].region == AWS_REGION
assert not rds.db_snapshots[0].public
# Test RDS Describe DB Clusters
@mock_rds
def test__describe_db_clusters__(self):
conn = client("rds", region_name=AWS_REGION)
cluster_id = "db-master-1"
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="default.postgres9.3",
Description="test parameter group",
)
conn.create_db_cluster(
DBClusterIdentifier=cluster_id,
AllocatedStorage=10,
Engine="postgres",
DatabaseName="staging-postgres",
StorageEncrypted=False,
DeletionProtection=True,
PubliclyAccessible=False,
AutoMinorVersionUpgrade=False,
BackupRetentionPeriod=1,
MasterUsername="test",
MasterUserPassword="password",
EnableCloudwatchLogsExports=["audit", "error"],
DBClusterParameterGroupName="test",
Tags=[
{"Key": "test", "Value": "test"},
],
)
# RDS client for this test class
audit_info = self.set_mocked_audit_info()
rds = RDS(audit_info)
assert len(rds.db_clusters) == 1
assert rds.db_clusters[cluster_id].id == "db-master-1"
assert rds.db_clusters[cluster_id].engine == "postgres"
assert rds.db_clusters[cluster_id].region == AWS_REGION
assert f"{AWS_REGION}.rds.amazonaws.com" in rds.db_clusters[cluster_id].endpoint
assert rds.db_clusters[cluster_id].status == "available"
assert not rds.db_clusters[cluster_id].public
assert not rds.db_clusters[cluster_id].encrypted
assert rds.db_clusters[cluster_id].backup_retention_period == 1
assert rds.db_clusters[cluster_id].cloudwatch_logs == ["audit", "error"]
assert rds.db_clusters[cluster_id].deletion_protection
assert not rds.db_clusters[cluster_id].auto_minor_version_upgrade
assert not rds.db_clusters[cluster_id].multi_az
assert rds.db_clusters[cluster_id].tags == [
{"Key": "test", "Value": "test"},
]
assert rds.db_clusters[cluster_id].parameter_group == "test"
# Test RDS Describe DB Cluster Snapshots
@mock_rds
def test__describe_db_cluster_snapshots__(self):