From 690ec057c3c4f7c32e94871a7c191f3c9239cf1c Mon Sep 17 00:00:00 2001 From: Sergio Garcia <38561120+sergargar@users.noreply.github.com> Date: Tue, 14 Nov 2023 12:03:01 +0100 Subject: [PATCH] fix(ec2_securitygroup_not_used): check if security group is associated (#3026) --- .../ec2_securitygroup_not_used.py | 10 ++- .../providers/aws/services/ec2/ec2_service.py | 8 +- .../ec2_securitygroup_not_used_test.py | 85 +++++++++++++++++++ 3 files changed, 101 insertions(+), 2 deletions(-) diff --git a/prowler/providers/aws/services/ec2/ec2_securitygroup_not_used/ec2_securitygroup_not_used.py b/prowler/providers/aws/services/ec2/ec2_securitygroup_not_used/ec2_securitygroup_not_used.py index d7d9daa8..8a232b33 100644 --- a/prowler/providers/aws/services/ec2/ec2_securitygroup_not_used/ec2_securitygroup_not_used.py +++ b/prowler/providers/aws/services/ec2/ec2_securitygroup_not_used/ec2_securitygroup_not_used.py @@ -18,10 +18,18 @@ class ec2_securitygroup_not_used(Check): report.status = "PASS" report.status_extended = f"Security group {security_group.name} ({security_group.id}) it is being used." sg_in_lambda = False + sg_associated = False for function in awslambda_client.functions.values(): if security_group.id in function.security_groups: sg_in_lambda = True - if len(security_group.network_interfaces) == 0 and not sg_in_lambda: + for sg in ec2_client.security_groups: + if security_group.id in sg.associated_sgs: + sg_associated = True + if ( + len(security_group.network_interfaces) == 0 + and not sg_in_lambda + and not sg_associated + ): report.status = "FAIL" report.status_extended = f"Security group {security_group.name} ({security_group.id}) it is not being used." diff --git a/prowler/providers/aws/services/ec2/ec2_service.py b/prowler/providers/aws/services/ec2/ec2_service.py index b57a90a7..4838f041 100644 --- a/prowler/providers/aws/services/ec2/ec2_service.py +++ b/prowler/providers/aws/services/ec2/ec2_service.py @@ -117,6 +117,7 @@ class EC2(AWSService): if not self.audit_resources or ( is_resource_filtered(arn, self.audit_resources) ): + associated_sgs = [] # check if sg has public access to all ports all_public_ports = False for ingress_rule in sg["IpPermissions"]: @@ -128,7 +129,10 @@ class EC2(AWSService): in self.audited_checks ): all_public_ports = True - break + # check associated security groups + for sg_group in ingress_rule.get("UserIdGroupPairs", []): + if sg_group.get("GroupId"): + associated_sgs.append(sg_group["GroupId"]) self.security_groups.append( SecurityGroup( name=sg["GroupName"], @@ -138,6 +142,7 @@ class EC2(AWSService): ingress_rules=sg["IpPermissions"], egress_rules=sg["IpPermissionsEgress"], public_ports=all_public_ports, + associated_sgs=associated_sgs, vpc_id=sg["VpcId"], tags=sg.get("Tags"), ) @@ -464,6 +469,7 @@ class SecurityGroup(BaseModel): id: str vpc_id: str public_ports: bool + associated_sgs: list network_interfaces: list[str] = [] ingress_rules: list[dict] egress_rules: list[dict] diff --git a/tests/providers/aws/services/ec2/ec2_securitygroup_not_used/ec2_securitygroup_not_used_test.py b/tests/providers/aws/services/ec2/ec2_securitygroup_not_used/ec2_securitygroup_not_used_test.py index cf216574..81b0ea50 100644 --- a/tests/providers/aws/services/ec2/ec2_securitygroup_not_used/ec2_securitygroup_not_used_test.py +++ b/tests/providers/aws/services/ec2/ec2_securitygroup_not_used/ec2_securitygroup_not_used_test.py @@ -244,3 +244,88 @@ class Test_ec2_securitygroup_not_used: assert result[0].resource_id == sg.id assert result[0].resource_details == sg_name assert result[0].resource_tags == [] + + @mock_ec2 + @mock_lambda + def test_ec2_associated_sg(self): + # Create EC2 Mocked Resources + ec2 = resource("ec2", AWS_REGION_US_EAST_1) + ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1) + vpc_id = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"] + sg_name = "test-sg" + sg_name1 = "test-sg1" + sg = ec2.create_security_group( + GroupName=sg_name, Description="test", VpcId=vpc_id + ) + sg1 = ec2.create_security_group( + GroupName=sg_name1, Description="test1", VpcId=vpc_id + ) + + ec2_client.authorize_security_group_ingress( + GroupId=sg.id, + IpPermissions=[ + { + "IpProtocol": "-1", + "UserIdGroupPairs": [ + { + "GroupId": sg1.id, + "Description": "Allow traffic from source SG", + } + ], + } + ], + ) + + from prowler.providers.aws.services.awslambda.awslambda_service import Lambda + from prowler.providers.aws.services.ec2.ec2_service import EC2 + + current_audit_info = set_mocked_aws_audit_info( + audited_regions=["us-east-1", "eu-west-1"] + ) + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ), mock.patch( + "prowler.providers.aws.services.ec2.ec2_securitygroup_not_used.ec2_securitygroup_not_used.ec2_client", + new=EC2(current_audit_info), + ), mock.patch( + "prowler.providers.aws.services.ec2.ec2_securitygroup_not_used.ec2_securitygroup_not_used.awslambda_client", + new=Lambda(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.ec2.ec2_securitygroup_not_used.ec2_securitygroup_not_used import ( + ec2_securitygroup_not_used, + ) + + check = ec2_securitygroup_not_used() + result = check.execute() + + # One custom sg + assert len(result) == 2 + assert result[0].status == "FAIL" + assert result[0].region == AWS_REGION_US_EAST_1 + assert ( + result[0].status_extended + == f"Security group {sg_name} ({sg.id}) it is not being used." + ) + assert ( + result[0].resource_arn + == f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION_US_EAST_1}:{current_audit_info.audited_account}:security-group/{sg.id}" + ) + assert result[0].resource_id == sg.id + assert result[0].resource_details == sg_name + assert result[0].resource_tags == [] + assert result[1].status == "PASS" + assert result[1].region == AWS_REGION_US_EAST_1 + assert ( + result[1].status_extended + == f"Security group {sg_name1} ({sg1.id}) it is being used." + ) + assert ( + result[1].resource_arn + == f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION_US_EAST_1}:{current_audit_info.audited_account}:security-group/{sg1.id}" + ) + assert result[1].resource_id == sg1.id + assert result[1].resource_details == sg_name1 + assert result[1].resource_tags == []