From 0c2ed53c54980fed643dfb7be585c2220845bd71 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Wed, 3 Aug 2022 16:38:29 +0200 Subject: [PATCH] refactor(security_groups): general function (#1306) --- ...allow_ingress_from_internet_to_any_port.py | 20 ++++---- ...ow_ingress_from_internet_to_tcp_port_22.py | 28 ++++------- ..._ingress_from_internet_to_tcp_port_3389.py | 28 ++++------- ...ss_from_internet_to_tcp_port_mysql_3306.py | 30 +++++------- ...m_internet_to_tcp_port_oracle_1521_2483.py | 37 +++++---------- providers/aws/services/ec2/ec2_service.py | 47 +++++++++++++++++++ 6 files changed, 98 insertions(+), 92 deletions(-) diff --git a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_any_port/ec2_securitygroup_allow_ingress_from_internet_to_any_port.py b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_any_port/ec2_securitygroup_allow_ingress_from_internet_to_any_port.py index 86b35073..c542fb84 100644 --- a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_any_port/ec2_securitygroup_allow_ingress_from_internet_to_any_port.py +++ b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_any_port/ec2_securitygroup_allow_ingress_from_internet_to_any_port.py @@ -1,5 +1,5 @@ from lib.check.models import Check, Check_Report -from providers.aws.services.ec2.ec2_service import ec2_client +from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client class ec2_securitygroup_allow_ingress_from_internet_to_any_port(Check): @@ -12,17 +12,15 @@ class ec2_securitygroup_allow_ingress_from_internet_to_any_port(Check): public = False report = Check_Report(self.metadata) report.region = region + # Loop through every security group's ingress rule and check it for ingress_rule in security_group.ingress_rules: - # Check if the security group is open to the internet to all protocols - if ( - "0.0.0.0/0" in str(ingress_rule["IpRanges"]) - or "::/0" in str(ingress_rule["Ipv6Ranges"]) - ) and ingress_rule["IpProtocol"] == "-1": - public = True - report.status = "FAIL" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has all ports open to the Internet." - report.resource_id = security_group.id - if not public: + public = check_security_group(ingress_rule, "-1") + # Check + if public: + report.status = "FAIL" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has all ports open to the Internet." + report.resource_id = security_group.id + else: report.status = "PASS" report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not all ports open to the Internet." report.resource_id = security_group.id diff --git a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22.py b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22.py index 5f502b52..4fc4d7c6 100644 --- a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22.py +++ b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22.py @@ -1,11 +1,11 @@ from lib.check.models import Check, Check_Report -from providers.aws.services.ec2.ec2_service import ec2_client +from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22(Check): def execute(self): findings = [] - check_port = 22 + check_ports = [22] for regional_client in ec2_client.regional_clients: region = regional_client.region if regional_client.security_groups: @@ -13,23 +13,15 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22(Check): public = False report = Check_Report(self.metadata) report.region = region + # Loop through every security group's ingress rule and check it for ingress_rule in security_group.ingress_rules: - if ( - ( - "0.0.0.0/0" in str(ingress_rule["IpRanges"]) - or "::/0" in str(ingress_rule["Ipv6Ranges"]) - ) - and ( - ingress_rule["FromPort"] == check_port - and ingress_rule["ToPort"] == check_port - ) - and ingress_rule["IpProtocol"] == "tcp" - ): - public = True - report.status = "FAIL" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the SSH port 22 open to the Internet." - report.resource_id = security_group.id - if not public: + public = check_security_group(ingress_rule, "tcp", check_ports) + # Check + if public: + report.status = "FAIL" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the SSH port 22 open to the Internet." + report.resource_id = security_group.id + else: report.status = "PASS" report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not SSH port 22 open to the Internet." report.resource_id = security_group.id diff --git a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389.py b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389.py index 83b17f0c..7d3f7463 100644 --- a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389.py +++ b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389.py @@ -1,11 +1,11 @@ from lib.check.models import Check, Check_Report -from providers.aws.services.ec2.ec2_service import ec2_client +from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389(Check): def execute(self): findings = [] - check_port = 3389 + check_ports = [3389] for regional_client in ec2_client.regional_clients: region = regional_client.region if regional_client.security_groups: @@ -13,23 +13,15 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389(Check): public = False report = Check_Report(self.metadata) report.region = region + # Loop through every security group's ingress rule and check it for ingress_rule in security_group.ingress_rules: - if ( - ( - "0.0.0.0/0" in str(ingress_rule["IpRanges"]) - or "::/0" in str(ingress_rule["Ipv6Ranges"]) - ) - and ( - ingress_rule["FromPort"] == check_port - and ingress_rule["ToPort"] == check_port - ) - and ingress_rule["IpProtocol"] == "tcp" - ): - public = True - report.status = "FAIL" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the Microsoft RDP port 3389 open to the Internet." - report.resource_id = security_group.id - if not public: + public = check_security_group(ingress_rule, "tcp", check_ports) + # Check + if public: + report.status = "FAIL" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet." + report.resource_id = security_group.id + else: report.status = "PASS" report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet." report.resource_id = security_group.id diff --git a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306.py b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306.py index 75f5db5e..dbca655a 100644 --- a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306.py +++ b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306.py @@ -1,11 +1,11 @@ from lib.check.models import Check, Check_Report -from providers.aws.services.ec2.ec2_service import ec2_client +from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306(Check): def execute(self): findings = [] - check_port = 3306 + check_ports = [3306] for regional_client in ec2_client.regional_clients: region = regional_client.region if regional_client.security_groups: @@ -13,25 +13,17 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306(Check public = False report = Check_Report(self.metadata) report.region = region + # Loop through every security group's ingress rule and check it for ingress_rule in security_group.ingress_rules: - if ( - ( - "0.0.0.0/0" in str(ingress_rule["IpRanges"]) - or "::/0" in str(ingress_rule["Ipv6Ranges"]) - ) - and ( - ingress_rule["FromPort"] == check_port - and ingress_rule["ToPort"] == check_port - ) - and ingress_rule["IpProtocol"] == "tcp" - ): - public = True - report.status = "FAIL" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the MySQL port open to the Internet." - report.resource_id = security_group.id - if not public: + public = check_security_group(ingress_rule, "tcp", check_ports) + # Check + if public: + report.status = "FAIL" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the MySQL port 3306 open to the Internet." + report.resource_id = security_group.id + else: report.status = "PASS" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MySQL ports open to the Internet." + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MySQL port 3306 open to the Internet." report.resource_id = security_group.id findings.append(report) else: diff --git a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483.py b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483.py index b2e6d57f..96e5e17a 100644 --- a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483.py +++ b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483.py @@ -1,12 +1,11 @@ from lib.check.models import Check, Check_Report -from providers.aws.services.ec2.ec2_service import ec2_client +from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483(Check): def execute(self): findings = [] - check_port_1 = 1521 - check_port_2 = 2483 + check_ports = [1521, 2483] for regional_client in ec2_client.regional_clients: region = regional_client.region if regional_client.security_groups: @@ -14,31 +13,17 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483 public = False report = Check_Report(self.metadata) report.region = region + # Loop through every security group's ingress rule and check it for ingress_rule in security_group.ingress_rules: - if ( - ( - "0.0.0.0/0" in str(ingress_rule["IpRanges"]) - or "::/0" in str(ingress_rule["Ipv6Ranges"]) - ) - and ( - ( - ingress_rule["FromPort"] == check_port_1 - and ingress_rule["ToPort"] == check_port_1 - ) - or ( - ingress_rule["FromPort"] == check_port_2 - and ingress_rule["ToPort"] == check_port_2 - ) - ) - and ingress_rule["IpProtocol"] == "tcp" - ): - public = True - report.status = "FAIL" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has Oracle ports open to the Internet." - report.resource_id = security_group.id - if not public: + public = check_security_group(ingress_rule, "tcp", check_ports) + # Check + if public: + report.status = "FAIL" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has Oracle ports 1521 and 2483 open to the Internet." + report.resource_id = security_group.id + else: report.status = "PASS" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Oracle ports open to the Internet." + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Oracle ports 1521 and 2483 open to the Internet." report.resource_id = security_group.id findings.append(report) else: diff --git a/providers/aws/services/ec2/ec2_service.py b/providers/aws/services/ec2/ec2_service.py index 9bbf476e..be0beead 100644 --- a/providers/aws/services/ec2/ec2_service.py +++ b/providers/aws/services/ec2/ec2_service.py @@ -1,5 +1,6 @@ import threading from dataclasses import dataclass +from typing import Any from lib.logger import logger from providers.aws.aws_provider import current_audit_info, generate_regional_clients @@ -232,3 +233,49 @@ class NetworkACL: ec2_client = EC2(current_audit_info) + +################## Security Groups +# Check if the security group ingress rule has public access to the check_ports using the protocol +def check_security_group(ingress_rule: Any, protocol: str, ports: list = []) -> bool: + public_IPv4 = "0.0.0.0/0" + public_IPv6 = "::/0" + + # Check for all traffic ingress rules regardless of the protocol + if ingress_rule["IpProtocol"] == "-1" and ( + ( + "0.0.0.0/0" in str(ingress_rule["IpRanges"]) + or "::/0" in str(ingress_rule["Ipv6Ranges"]) + ) + ): + return True + + # Check for specific ports in ingress rules + if "FromPort" in ingress_rule: + # All ports + if ingress_rule["FromPort"] == 0 and ingress_rule["ToPort"] == 65535: + return True + + # If there is a port range + if ingress_rule["FromPort"] != ingress_rule["ToPort"]: + # Calculate port range, adding 1 + diff = (ingress_rule["ToPort"] - ingress_rule["FromPort"]) + 1 + ingress_port_range = [] + for x in range(diff): + ingress_port_range.append(int(ingress_rule["FromPort"]) + x) + # If FromPort and ToPort are the same + else: + ingress_port_range = [] + ingress_port_range.append(int(ingress_rule["FromPort"])) + + # Test Security Group + for port in ports: + if ( + ( + public_IPv4 in str(ingress_rule["IpRanges"]) + or public_IPv6 in str(ingress_rule["Ipv6Ranges"]) + ) + and port in ingress_port_range + and ingress_rule["IpProtocol"] == protocol + ): + return True + return False