fix(ec2_securitygroup_default_restrict_traffic): fix check only allow empty rules (#2777)

This commit is contained in:
Nacho Rivera
2023-08-25 12:42:26 +02:00
committed by GitHub
parent 2386c71c4f
commit 276f6f9fb1
2 changed files with 140 additions and 111 deletions

View File

@@ -1,6 +1,5 @@
from prowler.lib.check.models import Check, Check_Report_AWS from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.ec2.ec2_client import ec2_client from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
class ec2_securitygroup_default_restrict_traffic(Check): class ec2_securitygroup_default_restrict_traffic(Check):
@@ -15,13 +14,14 @@ class ec2_securitygroup_default_restrict_traffic(Check):
report.resource_tags = security_group.tags report.resource_tags = security_group.tags
# Find default security group # Find default security group
if security_group.name == "default": if security_group.name == "default":
report.status = "PASS" report.status = "FAIL"
report.status_extended = f"Default Security Group ({security_group.id}) is not open to the Internet." report.status_extended = (
for ingress_rule in security_group.ingress_rules: f"Default Security Group ({security_group.id}) rules allow traffic."
if check_security_group(ingress_rule, "-1", any_address=True): )
report.status = "FAIL" if not security_group.ingress_rules and not security_group.egress_rules:
report.status_extended = f"Default Security Group ({security_group.id}) is open to the Internet." report.status = "PASS"
break report.status_extended = f"Default Security Group ({security_group.id}) rules do not allow traffic."
findings.append(report) findings.append(report)
return findings return findings

View File

@@ -28,7 +28,7 @@ class Test_ec2_securitygroup_default_restrict_traffic:
profile_region=None, profile_region=None,
credentials=None, credentials=None,
assumed_role_info=None, assumed_role_info=None,
audited_regions=["us-east-1", "eu-west-1"], audited_regions=["us-east-1"],
organizations_metadata=None, organizations_metadata=None,
audit_resources=None, audit_resources=None,
mfa_enabled=False, mfa_enabled=False,
@@ -43,102 +43,24 @@ class Test_ec2_securitygroup_default_restrict_traffic:
return audit_info return audit_info
@mock_ec2 @mock_ec2
def test_ec2_default_sgs(self): def test_ec2_compliant_sg(self):
# Create EC2 Mocked Resources # Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION) ec2_client = client("ec2", region_name=AWS_REGION)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
from prowler.providers.aws.services.ec2.ec2_service import EC2
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic import (
ec2_securitygroup_default_restrict_traffic,
)
check = ec2_securitygroup_default_restrict_traffic()
result = check.execute()
# One default sg per region
assert len(result) == 3
# All are compliant by default
assert result[0].status == "PASS"
assert result[1].status == "PASS"
assert result[2].status == "PASS"
@mock_ec2
def test_ec2_non_compliant_default_sg(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[ default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups" "SecurityGroups"
][0] ][0]
default_sg_id = default_sg["GroupId"] default_sg_id = default_sg["GroupId"]
default_sg_name = default_sg["GroupName"] default_sg_name = default_sg["GroupName"]
ec2_client.authorize_security_group_ingress( ec2_client.revoke_security_group_egress(
GroupId=default_sg_id,
IpPermissions=[{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "0.0.0.0/0"}]}],
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic import (
ec2_securitygroup_default_restrict_traffic,
)
check = ec2_securitygroup_default_restrict_traffic()
result = check.execute()
# One default sg per region
assert len(result) == 3
# Search changed sg
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "FAIL"
assert (
sg.status_extended
== f"Default Security Group ({default_sg_id}) is open to the Internet."
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
assert sg.resource_details == default_sg_name
assert sg.resource_tags == []
@mock_ec2
def test_ec2_compliant_default_sg(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
default_sg_name = default_sg["GroupName"]
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id, GroupId=default_sg_id,
IpPermissions=[ IpPermissions=[
{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "10.11.12.13/32"}]} {
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"Ipv6Ranges": [],
"PrefixListIds": [],
"UserIdGroupPairs": [],
}
], ],
) )
@@ -162,18 +84,125 @@ class Test_ec2_securitygroup_default_restrict_traffic:
result = check.execute() result = check.execute()
# One default sg per region # One default sg per region
assert len(result) == 3 assert len(result) == 1
# Search changed sg assert result[0].status == "PASS"
for sg in result: assert (
if sg.resource_id == default_sg_id: result[0].status_extended
assert sg.status == "PASS" == f"Default Security Group ({default_sg_id}) rules do not allow traffic."
assert ( )
sg.status_extended assert (
== f"Default Security Group ({default_sg_id}) is not open to the Internet." result[0].resource_arn
) == f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
assert ( )
sg.resource_arn assert result[0].resource_details == default_sg_name
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}" assert result[0].resource_tags == []
) assert result[0].region == AWS_REGION
assert sg.resource_details == default_sg_name assert result[0].resource_id == default_sg_id
assert sg.resource_tags == []
@mock_ec2
def test_ec2_non_compliant_sg_ingress_rule(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
default_sg_name = default_sg["GroupName"]
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "10.0.0.16/0"}]}
],
)
ec2_client.revoke_security_group_egress(
GroupId=default_sg_id,
IpPermissions=[
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"Ipv6Ranges": [],
"PrefixListIds": [],
"UserIdGroupPairs": [],
}
],
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic import (
ec2_securitygroup_default_restrict_traffic,
)
check = ec2_securitygroup_default_restrict_traffic()
result = check.execute()
# One default sg per region
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Default Security Group ({default_sg_id}) rules allow traffic."
)
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
assert result[0].resource_details == default_sg_name
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION
assert result[0].resource_id == default_sg_id
@mock_ec2
def test_ec2_non_compliant_sg_egress_rule(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
default_sg_name = default_sg["GroupName"]
from prowler.providers.aws.services.ec2.ec2_service import EC2
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic.ec2_client",
new=EC2(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_securitygroup_default_restrict_traffic.ec2_securitygroup_default_restrict_traffic import (
ec2_securitygroup_default_restrict_traffic,
)
check = ec2_securitygroup_default_restrict_traffic()
result = check.execute()
# One default sg per region
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Default Security Group ({default_sg_id}) rules allow traffic."
)
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
assert result[0].resource_details == default_sg_name
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION
assert result[0].resource_id == default_sg_id