refactor(security_groups): general function (#1306)

This commit is contained in:
Pepe Fagoaga
2022-08-03 16:38:29 +02:00
committed by GitHub
parent fe474ae9df
commit 0c2ed53c54
6 changed files with 98 additions and 92 deletions

View File

@@ -1,5 +1,5 @@
from lib.check.models import Check, Check_Report from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client
class ec2_securitygroup_allow_ingress_from_internet_to_any_port(Check): class ec2_securitygroup_allow_ingress_from_internet_to_any_port(Check):
@@ -12,17 +12,15 @@ class ec2_securitygroup_allow_ingress_from_internet_to_any_port(Check):
public = False public = False
report = Check_Report(self.metadata) report = Check_Report(self.metadata)
report.region = region report.region = region
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules: for ingress_rule in security_group.ingress_rules:
# Check if the security group is open to the internet to all protocols public = check_security_group(ingress_rule, "-1")
if ( # Check
"0.0.0.0/0" in str(ingress_rule["IpRanges"]) if public:
or "::/0" in str(ingress_rule["Ipv6Ranges"]) report.status = "FAIL"
) and ingress_rule["IpProtocol"] == "-1": report.status_extended = f"Security group {security_group.name} ({security_group.id}) has all ports open to the Internet."
public = True report.resource_id = security_group.id
report.status = "FAIL" else:
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has all ports open to the Internet."
report.resource_id = security_group.id
if not public:
report.status = "PASS" report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not all ports open to the Internet." report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not all ports open to the Internet."
report.resource_id = security_group.id report.resource_id = security_group.id

View File

@@ -1,11 +1,11 @@
from lib.check.models import Check, Check_Report from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client
class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22(Check): class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22(Check):
def execute(self): def execute(self):
findings = [] findings = []
check_port = 22 check_ports = [22]
for regional_client in ec2_client.regional_clients: for regional_client in ec2_client.regional_clients:
region = regional_client.region region = regional_client.region
if regional_client.security_groups: if regional_client.security_groups:
@@ -13,23 +13,15 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22(Check):
public = False public = False
report = Check_Report(self.metadata) report = Check_Report(self.metadata)
report.region = region report.region = region
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules: for ingress_rule in security_group.ingress_rules:
if ( public = check_security_group(ingress_rule, "tcp", check_ports)
( # Check
"0.0.0.0/0" in str(ingress_rule["IpRanges"]) if public:
or "::/0" in str(ingress_rule["Ipv6Ranges"]) report.status = "FAIL"
) report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the SSH port 22 open to the Internet."
and ( report.resource_id = security_group.id
ingress_rule["FromPort"] == check_port else:
and ingress_rule["ToPort"] == check_port
)
and ingress_rule["IpProtocol"] == "tcp"
):
public = True
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the SSH port 22 open to the Internet."
report.resource_id = security_group.id
if not public:
report.status = "PASS" report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not SSH port 22 open to the Internet." report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not SSH port 22 open to the Internet."
report.resource_id = security_group.id report.resource_id = security_group.id

View File

@@ -1,11 +1,11 @@
from lib.check.models import Check, Check_Report from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client
class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389(Check): class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389(Check):
def execute(self): def execute(self):
findings = [] findings = []
check_port = 3389 check_ports = [3389]
for regional_client in ec2_client.regional_clients: for regional_client in ec2_client.regional_clients:
region = regional_client.region region = regional_client.region
if regional_client.security_groups: if regional_client.security_groups:
@@ -13,23 +13,15 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389(Check):
public = False public = False
report = Check_Report(self.metadata) report = Check_Report(self.metadata)
report.region = region report.region = region
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules: for ingress_rule in security_group.ingress_rules:
if ( public = check_security_group(ingress_rule, "tcp", check_ports)
( # Check
"0.0.0.0/0" in str(ingress_rule["IpRanges"]) if public:
or "::/0" in str(ingress_rule["Ipv6Ranges"]) report.status = "FAIL"
) report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet."
and ( report.resource_id = security_group.id
ingress_rule["FromPort"] == check_port else:
and ingress_rule["ToPort"] == check_port
)
and ingress_rule["IpProtocol"] == "tcp"
):
public = True
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the Microsoft RDP port 3389 open to the Internet."
report.resource_id = security_group.id
if not public:
report.status = "PASS" report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet." report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet."
report.resource_id = security_group.id report.resource_id = security_group.id

View File

@@ -1,11 +1,11 @@
from lib.check.models import Check, Check_Report from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client
class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306(Check): class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306(Check):
def execute(self): def execute(self):
findings = [] findings = []
check_port = 3306 check_ports = [3306]
for regional_client in ec2_client.regional_clients: for regional_client in ec2_client.regional_clients:
region = regional_client.region region = regional_client.region
if regional_client.security_groups: if regional_client.security_groups:
@@ -13,25 +13,17 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306(Check
public = False public = False
report = Check_Report(self.metadata) report = Check_Report(self.metadata)
report.region = region report.region = region
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules: for ingress_rule in security_group.ingress_rules:
if ( public = check_security_group(ingress_rule, "tcp", check_ports)
( # Check
"0.0.0.0/0" in str(ingress_rule["IpRanges"]) if public:
or "::/0" in str(ingress_rule["Ipv6Ranges"]) report.status = "FAIL"
) report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the MySQL port 3306 open to the Internet."
and ( report.resource_id = security_group.id
ingress_rule["FromPort"] == check_port else:
and ingress_rule["ToPort"] == check_port
)
and ingress_rule["IpProtocol"] == "tcp"
):
public = True
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the MySQL port open to the Internet."
report.resource_id = security_group.id
if not public:
report.status = "PASS" report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MySQL ports open to the Internet." report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MySQL port 3306 open to the Internet."
report.resource_id = security_group.id report.resource_id = security_group.id
findings.append(report) findings.append(report)
else: else:

View File

@@ -1,12 +1,11 @@
from lib.check.models import Check, Check_Report from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client
class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483(Check): class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483(Check):
def execute(self): def execute(self):
findings = [] findings = []
check_port_1 = 1521 check_ports = [1521, 2483]
check_port_2 = 2483
for regional_client in ec2_client.regional_clients: for regional_client in ec2_client.regional_clients:
region = regional_client.region region = regional_client.region
if regional_client.security_groups: if regional_client.security_groups:
@@ -14,31 +13,17 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483
public = False public = False
report = Check_Report(self.metadata) report = Check_Report(self.metadata)
report.region = region report.region = region
# Loop through every security group's ingress rule and check it
for ingress_rule in security_group.ingress_rules: for ingress_rule in security_group.ingress_rules:
if ( public = check_security_group(ingress_rule, "tcp", check_ports)
( # Check
"0.0.0.0/0" in str(ingress_rule["IpRanges"]) if public:
or "::/0" in str(ingress_rule["Ipv6Ranges"]) report.status = "FAIL"
) report.status_extended = f"Security group {security_group.name} ({security_group.id}) has Oracle ports 1521 and 2483 open to the Internet."
and ( report.resource_id = security_group.id
( else:
ingress_rule["FromPort"] == check_port_1
and ingress_rule["ToPort"] == check_port_1
)
or (
ingress_rule["FromPort"] == check_port_2
and ingress_rule["ToPort"] == check_port_2
)
)
and ingress_rule["IpProtocol"] == "tcp"
):
public = True
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has Oracle ports open to the Internet."
report.resource_id = security_group.id
if not public:
report.status = "PASS" report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Oracle ports open to the Internet." report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Oracle ports 1521 and 2483 open to the Internet."
report.resource_id = security_group.id report.resource_id = security_group.id
findings.append(report) findings.append(report)
else: else:

View File

@@ -1,5 +1,6 @@
import threading import threading
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any
from lib.logger import logger from lib.logger import logger
from providers.aws.aws_provider import current_audit_info, generate_regional_clients from providers.aws.aws_provider import current_audit_info, generate_regional_clients
@@ -232,3 +233,49 @@ class NetworkACL:
ec2_client = EC2(current_audit_info) ec2_client = EC2(current_audit_info)
################## Security Groups
# Check if the security group ingress rule has public access to the check_ports using the protocol
def check_security_group(ingress_rule: Any, protocol: str, ports: list = []) -> bool:
public_IPv4 = "0.0.0.0/0"
public_IPv6 = "::/0"
# Check for all traffic ingress rules regardless of the protocol
if ingress_rule["IpProtocol"] == "-1" and (
(
"0.0.0.0/0" in str(ingress_rule["IpRanges"])
or "::/0" in str(ingress_rule["Ipv6Ranges"])
)
):
return True
# Check for specific ports in ingress rules
if "FromPort" in ingress_rule:
# All ports
if ingress_rule["FromPort"] == 0 and ingress_rule["ToPort"] == 65535:
return True
# If there is a port range
if ingress_rule["FromPort"] != ingress_rule["ToPort"]:
# Calculate port range, adding 1
diff = (ingress_rule["ToPort"] - ingress_rule["FromPort"]) + 1
ingress_port_range = []
for x in range(diff):
ingress_port_range.append(int(ingress_rule["FromPort"]) + x)
# If FromPort and ToPort are the same
else:
ingress_port_range = []
ingress_port_range.append(int(ingress_rule["FromPort"]))
# Test Security Group
for port in ports:
if (
(
public_IPv4 in str(ingress_rule["IpRanges"])
or public_IPv6 in str(ingress_rule["Ipv6Ranges"])
)
and port in ingress_port_range
and ingress_rule["IpProtocol"] == protocol
):
return True
return False