From db15c0de9e0bc75b2dc4aaa9878cc3efdb8c5cf8 Mon Sep 17 00:00:00 2001 From: Sergio Garcia <38561120+sergargar@users.noreply.github.com> Date: Tue, 6 Feb 2024 10:49:58 +0100 Subject: [PATCH] fix(rds): verify SGs in `rds_instance_no_public_access` (#3341) Co-authored-by: Pepe Fagoaga --- .../rds_instance_no_public_access.py | 29 ++-- .../providers/aws/services/rds/rds_service.py | 6 + .../rds_instance_no_public_access_test.py | 136 +++++++++++++++++- 3 files changed, 157 insertions(+), 14 deletions(-) diff --git a/prowler/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access.py b/prowler/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access.py index a5e2cd29..8040302d 100644 --- a/prowler/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access.py +++ b/prowler/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access.py @@ -1,4 +1,5 @@ from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.ec2.ec2_client import ec2_client from prowler.providers.aws.services.rds.rds_client import rds_client @@ -11,17 +12,23 @@ class rds_instance_no_public_access(Check): report.resource_id = db_instance.id report.resource_arn = db_instance.arn report.resource_tags = db_instance.tags - if not db_instance.public: - report.status = "PASS" - report.status_extended = ( - f"RDS Instance {db_instance.id} is not Publicly Accessible." - ) - else: - report.status = "FAIL" - report.status_extended = ( - f"RDS Instance {db_instance.id} is set as Publicly Accessible." - ) - + report.status = "PASS" + report.status_extended = ( + f"RDS Instance {db_instance.id} is not publicly accessible." + ) + if db_instance.public: + # Check if any DB Instance Security Group is publicly open + if db_instance.security_groups: + report.status = "PASS" + report.status_extended = f"RDS Instance {db_instance.id} is public but filtered with security groups." + for security_group in ec2_client.security_groups: + if ( + security_group.id in db_instance.security_groups + and security_group.public_ports + ): + report.status = "FAIL" + report.status_extended = f"RDS Instance {db_instance.id} is set as publicly accessible." + break findings.append(report) return findings diff --git a/prowler/providers/aws/services/rds/rds_service.py b/prowler/providers/aws/services/rds/rds_service.py index 82907366..6a8a5c3a 100644 --- a/prowler/providers/aws/services/rds/rds_service.py +++ b/prowler/providers/aws/services/rds/rds_service.py @@ -68,6 +68,11 @@ class RDS(AWSService): for item in instance["DBParameterGroups"] ], multi_az=instance["MultiAZ"], + security_groups=[ + sg["VpcSecurityGroupId"] + for sg in instance["VpcSecurityGroups"] + if sg["Status"] == "active" + ], cluster_id=instance.get("DBClusterIdentifier"), cluster_arn=f"arn:{self.audited_partition}:rds:{regional_client.region}:{self.audited_account}:cluster:{instance.get('DBClusterIdentifier')}", region=regional_client.region, @@ -295,6 +300,7 @@ class DBInstance(BaseModel): multi_az: bool parameter_groups: list[str] = [] parameters: list[dict] = [] + security_groups: list[str] = [] cluster_id: Optional[str] cluster_arn: Optional[str] region: str diff --git a/tests/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access_test.py b/tests/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access_test.py index 16dc02b0..a4823663 100644 --- a/tests/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access_test.py +++ b/tests/providers/aws/services/rds/rds_instance_no_public_access/rds_instance_no_public_access_test.py @@ -89,7 +89,7 @@ class Test_rds_instance_no_public_access: assert len(result) == 1 assert result[0].status == "PASS" assert search( - "is not Publicly Accessible", + "is not publicly accessible", result[0].status_extended, ) assert result[0].resource_id == "db-master-1" @@ -133,9 +133,139 @@ class Test_rds_instance_no_public_access: result = check.execute() assert len(result) == 1 - assert result[0].status == "FAIL" + assert result[0].status == "PASS" assert search( - "is set as Publicly Accessible", + "is not publicly accessible", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" + assert result[0].region == AWS_REGION_US_EAST_1 + assert ( + result[0].resource_arn + == f"arn:aws:rds:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:db:db-master-1" + ) + assert result[0].resource_tags == [] + + @mock_aws + def test_rds_instance_public_with_public_sg(self): + ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1) + ec2_client.create_vpc(CidrBlock="10.0.0.0/16") + default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[ + "SecurityGroups" + ][0] + default_sg_id = default_sg["GroupId"] + ec2_client.authorize_security_group_ingress( + GroupId=default_sg_id, + IpPermissions=[ + { + "IpProtocol": "-1", + "IpRanges": [{"CidrIp": "0.0.0.0/0"}], + } + ], + ) + conn = client("rds", region_name=AWS_REGION_US_EAST_1) + conn.create_db_instance( + DBInstanceIdentifier="db-master-1", + AllocatedStorage=10, + Engine="postgres", + DBName="staging-postgres", + DBInstanceClass="db.m1.small", + PubliclyAccessible=True, + VpcSecurityGroupIds=[default_sg_id], + ) + + from prowler.providers.aws.services.ec2.ec2_service import EC2 + from prowler.providers.aws.services.rds.rds_service import RDS + + audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1]) + audit_info.audit_metadata.expected_checks = [ + "ec2_securitygroup_allow_ingress_from_internet_to_any_port" + ] + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.rds_client", + new=RDS(audit_info), + ), mock.patch( + "prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.ec2_client", + new=EC2(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access import ( + rds_instance_no_public_access, + ) + + check = rds_instance_no_public_access() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert search( + "is set as publicly accessible", + result[0].status_extended, + ) + assert result[0].resource_id == "db-master-1" + assert result[0].region == AWS_REGION_US_EAST_1 + assert ( + result[0].resource_arn + == f"arn:aws:rds:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:db:db-master-1" + ) + assert result[0].resource_tags == [] + + @mock_aws + def test_rds_instance_public_with_filtered_sg(self): + ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1) + ec2_client.create_vpc(CidrBlock="10.0.0.0/16") + default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[ + "SecurityGroups" + ][0] + default_sg_id = default_sg["GroupId"] + ec2_client.authorize_security_group_ingress( + GroupId=default_sg_id, + IpPermissions=[ + { + "IpProtocol": "-1", + "IpRanges": [{"CidrIp": "123.123.123.123/32"}], + } + ], + ) + conn = client("rds", region_name=AWS_REGION_US_EAST_1) + conn.create_db_instance( + DBInstanceIdentifier="db-master-1", + AllocatedStorage=10, + Engine="postgres", + DBName="staging-postgres", + DBInstanceClass="db.m1.small", + PubliclyAccessible=True, + VpcSecurityGroupIds=[default_sg_id], + ) + from prowler.providers.aws.services.rds.rds_service import RDS + + audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.rds_client", + new=RDS(audit_info), + ): + # Test Check + from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access import ( + rds_instance_no_public_access, + ) + + check = rds_instance_no_public_access() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert search( + "is public but filtered with security groups", result[0].status_extended, ) assert result[0].resource_id == "db-master-1"