From f30245bb1534ea9c231f72d77893709decb5d35a Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Thu, 4 Aug 2022 16:33:16 +0200 Subject: [PATCH] fix(nacls): Handle IPv6 source ingress (#1319) --- ...c2_networkacl_allow_ingress_tcp_port_22.py | 25 ++++++++--------- ..._networkacl_allow_ingress_tcp_port_3389.py | 22 ++++++--------- providers/aws/services/ec2/ec2_service.py | 27 +++++++++++++++++++ 3 files changed, 46 insertions(+), 28 deletions(-) diff --git a/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_22/ec2_networkacl_allow_ingress_tcp_port_22.py b/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_22/ec2_networkacl_allow_ingress_tcp_port_22.py index 06d65b99..f50df65e 100644 --- a/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_22/ec2_networkacl_allow_ingress_tcp_port_22.py +++ b/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_22/ec2_networkacl_allow_ingress_tcp_port_22.py @@ -1,29 +1,26 @@ from lib.check.models import Check, Check_Report -from providers.aws.services.ec2.ec2_service import ec2_client +from providers.aws.services.ec2.ec2_service import check_network_acl, ec2_client class ec2_networkacl_allow_ingress_tcp_port_22(Check): def execute(self): findings = [] + tcp_protocol = "6" check_port = 22 for network_acl in ec2_client.network_acls: public = False report = Check_Report(self.metadata) report.region = network_acl.region for entry in network_acl.entries: - if ( - entry["CidrBlock"] == "0.0.0.0/0" - and entry["RuleAction"] == "allow" - and not entry["Egress"] - ): - if entry["Protocol"] == "-1": - public = True - elif ( - entry["PortRange"]["From"] == check_port - and entry["PortRange"]["To"] == check_port - and entry["Protocol"] == "6" - ): - public = True + # For IPv4 + if "CidrBlock" in entry: + public = check_network_acl(entry, tcp_protocol, check_port, "IPv4") + # For IPv6 + if "Ipv6CidrBlock" in entry: + public = check_network_acl(entry, tcp_protocol, check_port, "IPv6") + # If some entry allows it, that ACL is not securely configured + if public: + break if not public: report.status = "PASS" report.status_extended = f"Network ACL {network_acl.id} has not SSH port 22 open to the Internet." diff --git a/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_3389/ec2_networkacl_allow_ingress_tcp_port_3389.py b/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_3389/ec2_networkacl_allow_ingress_tcp_port_3389.py index 34863ba5..8b991e40 100644 --- a/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_3389/ec2_networkacl_allow_ingress_tcp_port_3389.py +++ b/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_3389/ec2_networkacl_allow_ingress_tcp_port_3389.py @@ -1,29 +1,23 @@ from lib.check.models import Check, Check_Report -from providers.aws.services.ec2.ec2_service import ec2_client +from providers.aws.services.ec2.ec2_service import check_network_acl, ec2_client class ec2_networkacl_allow_ingress_tcp_port_3389(Check): def execute(self): findings = [] + tcp_protocol = "6" check_port = 3389 for network_acl in ec2_client.network_acls: public = False report = Check_Report(self.metadata) report.region = network_acl.region for entry in network_acl.entries: - if ( - entry["CidrBlock"] == "0.0.0.0/0" - and entry["RuleAction"] == "allow" - and not entry["Egress"] - ): - if entry["Protocol"] == "-1": - public = True - elif ( - entry["PortRange"]["From"] == check_port - and entry["PortRange"]["To"] == check_port - and entry["Protocol"] == "6" - ): - public = True + # For IPv4 + if "CidrBlock" in entry: + public = check_network_acl(entry, tcp_protocol, check_port, "IPv4") + # For IPv6 + if "Ipv6CidrBlock" in entry: + public = check_network_acl(entry, tcp_protocol, check_port, "IPv6") if not public: report.status = "PASS" report.status_extended = f"Network ACL {network_acl.id} has not Microsoft RDP port 3389 open to the Internet." diff --git a/providers/aws/services/ec2/ec2_service.py b/providers/aws/services/ec2/ec2_service.py index 81d45e65..e3b49864 100644 --- a/providers/aws/services/ec2/ec2_service.py +++ b/providers/aws/services/ec2/ec2_service.py @@ -284,3 +284,30 @@ def check_security_group(ingress_rule: Any, protocol: str, ports: list = []) -> ): return True return False + + +################## Network ACLs +# Check if the network acls ingress rule has public access to the check_ports using the protocol +def check_network_acl(entry: Any, protocol: str, port: str, ip_version: str) -> bool: + # For IPv4 + if ip_version == "IPv4": + entry_value = "CidrBlock" + public_ip = "0.0.0.0/0" + # For IPv6 + elif ip_version == "IPv6": + entry_value = "Ipv6CidrBlock" + public_ip = "::/0" + + if ( + entry[entry_value] == public_ip + and entry["RuleAction"] == "allow" + and not entry["Egress"] + ): + if entry["Protocol"] == "-1" or ( + entry["PortRange"]["From"] == port + and entry["PortRange"]["To"] == port + and entry["Protocol"] == protocol + ): + return True + + return False