feat(rds_instance_transport_encrypted): add new check (#1963)

Co-authored-by: Toni de la Fuente <toni@blyx.com>
This commit is contained in:
Sergio Garcia
2023-03-06 13:18:41 +01:00
committed by GitHub
parent 90ebbfc20f
commit c5a42cf5de
6 changed files with 264 additions and 0 deletions

View File

@@ -0,0 +1,30 @@
{
"Provider": "aws",
"CheckID": "rds_instance_transport_encrypted",
"CheckTitle": "Check if RDS instances client connections are encrypted (Microsoft SQL Server and PostgreSQL).",
"CheckType": [],
"ServiceName": "rds",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:rds:region:account-id:db-instance",
"Severity": "high",
"ResourceType": "AwsRdsDbInstance",
"Description": "Check if RDS instances client connections are encrypted (Microsoft SQL Server and PostgreSQL).",
"Risk": "If not enabled sensitive information at transit is not protected.",
"RelatedUrl": "https://aws.amazon.com/premiumsupport/knowledge-center/rds-connect-ssl-connection/",
"Remediation": {
"Code": {
"CLI": "aws rds modify-db-parameter-group --region <REGION_NAME> --db-parameter-group-name <PARAMETER_GROUP_NAME> --parameters ParameterName='rds.force_ssl',ParameterValue='1',ApplyMethod='pending-reboot'",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/RDS/transport-encryption.html",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure that Microsoft SQL Server and PostgreSQL instances provisioned with Amazon RDS have Transport Encryption feature enabled in order to meet security and compliance requirements.",
"Url": "https://aws.amazon.com/premiumsupport/knowledge-center/rds-connect-ssl-connection/"
}
},
"Categories": [ "encryption" ],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,29 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_transport_encrypted(Check):
def execute(self):
findings = []
supported_engines = ["sqlserver", "postgres"]
for db_instance in rds_client.db_instances:
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.status = "FAIL"
report.status_extended = (
f"RDS Instance {db_instance.id} connections are not encrypted."
)
# Check only RDS SQL Server or PostgreSQL engines
if any(engine in db_instance.engine for engine in supported_engines):
for parameter in db_instance.parameters:
if (
parameter["ParameterName"] == "rds.force_ssl"
and parameter["ParameterValue"] == "1"
):
report.status = "PASS"
report.status_extended = f"RDS Instance {db_instance.id} connections use SSL encryption."
findings.append(report)
return findings

View File

@@ -20,6 +20,7 @@ class RDS:
self.db_snapshots = []
self.db_cluster_snapshots = []
self.__threading_call__(self.__describe_db_instances__)
self.__threading_call__(self.__describe_db_parameters__)
self.__threading_call__(self.__describe_db_snapshots__)
self.__threading_call__(self.__describe_db_snapshot_attributes__)
self.__threading_call__(self.__describe_db_cluster_snapshots__)
@@ -72,6 +73,10 @@ class RDS:
enhanced_monitoring_arn=instance.get(
"EnhancedMonitoringResourceArn"
),
parameter_groups=[
item["DBParameterGroupName"]
for item in instance["DBParameterGroups"]
],
multi_az=instance["MultiAZ"],
region=regional_client.region,
tags=instance.get("TagList"),
@@ -82,6 +87,26 @@ class RDS:
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_db_parameters__(self, regional_client):
logger.info("RDS - Describe DB Parameters...")
try:
for instance in self.db_instances:
if instance.region == regional_client.region:
for parameter_group in instance.parameter_groups:
describe_db_parameters_paginator = (
regional_client.get_paginator("describe_db_parameters")
)
for page in describe_db_parameters_paginator.paginate(
DBParameterGroupName=parameter_group
):
for parameter in page["Parameters"]:
instance.parameters.append(parameter)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_db_snapshots__(self, regional_client):
logger.info("RDS - Describe Snapshots...")
try:
@@ -185,6 +210,8 @@ class DBInstance(BaseModel):
auto_minor_version_upgrade: bool
enhanced_monitoring_arn: Optional[str]
multi_az: bool
parameter_groups: list[str] = []
parameters: list[dict] = []
region: str
tags: Optional[list] = []

View File

@@ -0,0 +1,134 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_rds
AWS_REGION = "us-east-1"
class Test_rds_instance_transport_encrypted:
@mock_rds
def test_rds_no_instances(self):
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.rds.rds_service import RDS
current_audit_info.audited_partition = "aws"
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client",
new=RDS(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import (
rds_instance_transport_encrypted,
)
check = rds_instance_transport_encrypted()
result = check.execute()
assert len(result) == 0
@mock_rds
def test_rds_instance_no_ssl(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="default.postgres9.3",
Description="test parameter group",
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
DBParameterGroupName="test",
)
conn.modify_db_parameter_group(
DBParameterGroupName="test",
Parameters=[
{
"ParameterName": "rds.force_ssl",
"ParameterValue": "0",
"ApplyMethod": "immediate",
},
],
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.rds.rds_service import RDS
current_audit_info.audited_partition = "aws"
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client",
new=RDS(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import (
rds_instance_transport_encrypted,
)
check = rds_instance_transport_encrypted()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"connections are not encrypted",
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"
@mock_rds
def test_rds_instance_with_ssl(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="default.postgres9.3",
Description="test parameter group",
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
DBParameterGroupName="test",
)
conn.modify_db_parameter_group(
DBParameterGroupName="test",
Parameters=[
{
"ParameterName": "rds.force_ssl",
"ParameterValue": "1",
"ApplyMethod": "immediate",
},
],
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.rds.rds_service import RDS
current_audit_info.audited_partition = "aws"
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted.rds_client",
new=RDS(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_transport_encrypted.rds_instance_transport_encrypted import (
rds_instance_transport_encrypted,
)
check = rds_instance_transport_encrypted()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"connections use SSL encryption",
result[0].status_extended,
)
assert result[0].resource_id == "db-master-1"

View File

@@ -69,6 +69,11 @@ class Test_RDS_Service:
@mock_rds
def test__describe_db_instances__(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="default.postgres9.3",
Description="test parameter group",
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
@@ -82,6 +87,7 @@ class Test_RDS_Service:
BackupRetentionPeriod=10,
EnableCloudwatchLogsExports=["audit", "error"],
MultiAZ=True,
DBParameterGroupName="test",
Tags=[
{"Key": "test", "Value": "test"},
],
@@ -107,6 +113,44 @@ class Test_RDS_Service:
assert rds.db_instances[0].tags == [
{"Key": "test", "Value": "test"},
]
assert "test" in rds.db_instances[0].parameter_groups
@mock_rds
def test__describe_db_parameters__(self):
conn = client("rds", region_name=AWS_REGION)
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="default.postgres9.3",
Description="test parameter group",
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
DBParameterGroupName="test",
)
conn.modify_db_parameter_group(
DBParameterGroupName="test",
Parameters=[
{
"ParameterName": "rds.force_ssl",
"ParameterValue": "1",
"ApplyMethod": "immediate",
},
],
)
# RDS client for this test class
audit_info = self.set_mocked_audit_info()
rds = RDS(audit_info)
assert len(rds.db_instances) == 1
assert rds.db_instances[0].id == "db-master-1"
assert rds.db_instances[0].region == AWS_REGION
for parameter in rds.db_instances[0].parameters:
if parameter["ParameterName"] == "rds.force_ssl":
assert parameter["ParameterValue"] == "1"
# Test RDS Describe DB Snapshots
@mock_rds