diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index afbfe3dc..0b8b3b42 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -55,7 +55,7 @@ repos: language: system - id: safety - name: bandit + name: safety description: 'Safety is a tool that checks your installed dependencies for known security vulnerabilities' entry: bash -c 'safety check' language: system diff --git a/lib/outputs/outputs.py b/lib/outputs/outputs.py index 1b724763..86fd0235 100644 --- a/lib/outputs/outputs.py +++ b/lib/outputs/outputs.py @@ -42,53 +42,59 @@ def report(check_findings, output_options, audit_info): csv_fields, ) - for finding in check_findings: - # Print findings by stdout - color = set_report_color(finding.status) - if output_options.is_quiet and "FAIL" in finding.status: - print( - f"\t{color}{finding.status}{Style.RESET_ALL} {finding.region}: {finding.status_extended}" - ) - elif not output_options.is_quiet: - print( - f"\t{color}{finding.status}{Style.RESET_ALL} {finding.region}: {finding.status_extended}" - ) - if file_descriptors: - - # sending the finding to input options - if "csv" in file_descriptors: - finding_output = Check_Output_CSV( - audit_info.audited_account, - audit_info.profile, - finding, - audit_info.organizations_metadata, + if check_findings: + for finding in check_findings: + # Print findings by stdout + color = set_report_color(finding.status) + if output_options.is_quiet and "FAIL" in finding.status: + print( + f"\t{color}{finding.status}{Style.RESET_ALL} {finding.region}: {finding.status_extended}" ) - csv_writer = DictWriter( - file_descriptors["csv"], fieldnames=csv_fields, delimiter=";" + elif not output_options.is_quiet: + print( + f"\t{color}{finding.status}{Style.RESET_ALL} {finding.region}: {finding.status_extended}" ) - csv_writer.writerow(finding_output.__dict__) + + if file_descriptors: - if "json" in file_descriptors: - finding_output = Check_Output_JSON(**finding.check_metadata.dict()) - fill_json(finding_output, audit_info, finding) + # sending the finding to input options + if "csv" in file_descriptors: + finding_output = Check_Output_CSV( + audit_info.audited_account, + audit_info.profile, + finding, + audit_info.organizations_metadata, + ) + csv_writer = DictWriter( + file_descriptors["csv"], fieldnames=csv_fields, delimiter=";" + ) + csv_writer.writerow(finding_output.__dict__) - json.dump(finding_output.dict(), file_descriptors["json"], indent=4) - file_descriptors["json"].write(",") + if "json" in file_descriptors: + finding_output = Check_Output_JSON(**finding.check_metadata.dict()) + fill_json(finding_output, audit_info, finding) - if "json-asff" in file_descriptors: - finding_output = Check_Output_JSON_ASFF() - fill_json_asff(finding_output, audit_info, finding) + json.dump(finding_output.dict(), file_descriptors["json"], indent=4) + file_descriptors["json"].write(",") - json.dump( - finding_output.dict(), file_descriptors["json-asff"], indent=4 - ) - file_descriptors["json-asff"].write(",") + if "json-asff" in file_descriptors: + finding_output = Check_Output_JSON_ASFF() + fill_json_asff(finding_output, audit_info, finding) - # Check if it is needed to send findings to security hub - if output_options.security_hub_enabled: - send_to_security_hub( - finding.region, finding_output, audit_info.audit_session - ) + json.dump( + finding_output.dict(), file_descriptors["json-asff"], indent=4 + ) + file_descriptors["json-asff"].write(",") + + # Check if it is needed to send findings to security hub + if output_options.security_hub_enabled: + send_to_security_hub( + finding.region, finding_output, audit_info.audit_session + ) + else: # No service resources in the whole account + color = set_report_color("PASS") + if not output_options.is_quiet: + print(f"{color}PASS{Style.RESET_ALL} There are no resources.") if file_descriptors: # Close all file descriptors diff --git a/providers/aws/aws_provider.py b/providers/aws/aws_provider.py index 7098caa9..2a548779 100644 --- a/providers/aws/aws_provider.py +++ b/providers/aws/aws_provider.py @@ -309,7 +309,7 @@ def get_organizations_metadata( def generate_regional_clients(service, audit_info): - regional_clients = [] + regional_clients = {} # Get json locally f = open_file(aws_services_json_file) data = parse_json_file(f) @@ -323,8 +323,8 @@ def generate_regional_clients(service, audit_info): for region in regions: regional_client = audit_info.audit_session.client(service, region_name=region) regional_client.region = region - regional_clients.append(regional_client) - + regional_clients[region] = regional_client + # regional_clients.append(regional_client) return regional_clients diff --git a/providers/aws/services/ec2/ec2_ebs_public_snapshot/ec2_ebs_public_snapshot.py b/providers/aws/services/ec2/ec2_ebs_public_snapshot/ec2_ebs_public_snapshot.py index 68d34797..80b1ae08 100644 --- a/providers/aws/services/ec2/ec2_ebs_public_snapshot/ec2_ebs_public_snapshot.py +++ b/providers/aws/services/ec2/ec2_ebs_public_snapshot/ec2_ebs_public_snapshot.py @@ -5,31 +5,19 @@ from providers.aws.services.ec2.ec2_service import ec2_client class ec2_ebs_public_snapshot(Check): def execute(self): findings = [] - for regional_client in ec2_client.regional_clients: - region = regional_client.region - if regional_client.snapshots: - for snapshot in regional_client.snapshots: - report = Check_Report(self.metadata) - report.region = region - if not snapshot.public: - report.status = "PASS" - report.status_extended = ( - f"EBS Snapshot {snapshot.id} is not Public" - ) - report.resource_id = snapshot.id - else: - report.status = "FAIL" - report.status_extended = ( - f"EBS Snapshot {snapshot.id} is currently Public" - ) - report.resource_id = snapshot.id - findings.append(report) - else: - report = Check_Report(self.metadata) + for snapshot in ec2_client.snapshots: + report = Check_Report(self.metadata) + report.region = snapshot.region + if not snapshot.public: report.status = "PASS" - report.status_extended = "There are no EC2 EBS snapshots" - report.region = region - - findings.append(report) + report.status_extended = f"EBS Snapshot {snapshot.id} is not Public" + report.resource_id = snapshot.id + else: + report.status = "FAIL" + report.status_extended = ( + f"EBS Snapshot {snapshot.id} is currently Public" + ) + report.resource_id = snapshot.id + findings.append(report) return findings diff --git a/providers/aws/services/ec2/ec2_ebs_snapshots_encrypted/ec2_ebs_snapshots_encrypted.py b/providers/aws/services/ec2/ec2_ebs_snapshots_encrypted/ec2_ebs_snapshots_encrypted.py index a0a2bb0a..cdde6519 100644 --- a/providers/aws/services/ec2/ec2_ebs_snapshots_encrypted/ec2_ebs_snapshots_encrypted.py +++ b/providers/aws/services/ec2/ec2_ebs_snapshots_encrypted/ec2_ebs_snapshots_encrypted.py @@ -5,31 +5,17 @@ from providers.aws.services.ec2.ec2_service import ec2_client class ec2_ebs_snapshots_encrypted(Check): def execute(self): findings = [] - for regional_client in ec2_client.regional_clients: - region = regional_client.region - if regional_client.snapshots: - for snapshot in regional_client.snapshots: - report = Check_Report(self.metadata) - report.region = region - if snapshot.encrypted: - report.status = "PASS" - report.status_extended = ( - f"EBS Snapshot {snapshot.id} is encrypted" - ) - report.resource_id = snapshot.id - else: - report.status = "FAIL" - report.status_extended = ( - f"EBS Snapshot {snapshot.id} is unencrypted" - ) - report.resource_id = snapshot.id - findings.append(report) - else: - report = Check_Report(self.metadata) + for snapshot in ec2_client.snapshots: + report = Check_Report(self.metadata) + report.region = snapshot.region + if snapshot.encrypted: report.status = "PASS" - report.status_extended = "There are no EC2 EBS snapshots" - report.region = region - - findings.append(report) + report.status_extended = f"EBS Snapshot {snapshot.id} is encrypted" + report.resource_id = snapshot.id + else: + report.status = "FAIL" + report.status_extended = f"EBS Snapshot {snapshot.id} is unencrypted" + report.resource_id = snapshot.id + findings.append(report) return findings diff --git a/providers/aws/services/ec2/ec2_instance_public_ip/ec2_instance_public_ip.py b/providers/aws/services/ec2/ec2_instance_public_ip/ec2_instance_public_ip.py index da55af99..18e3c0b8 100644 --- a/providers/aws/services/ec2/ec2_instance_public_ip/ec2_instance_public_ip.py +++ b/providers/aws/services/ec2/ec2_instance_public_ip/ec2_instance_public_ip.py @@ -5,29 +5,19 @@ from providers.aws.services.ec2.ec2_service import ec2_client class ec2_instance_public_ip(Check): def execute(self): findings = [] - for regional_client in ec2_client.regional_clients: - region = regional_client.region - if regional_client.instances: - for instance in regional_client.instances: - report = Check_Report(self.metadata) - report.region = region - if instance.public_ip: - report.status = "FAIL" - report.status_extended = f"EC2 instance {instance.id} has a Public IP: {instance.public_ip} ({instance.public_dns})." - report.resource_id = instance.id - else: - report.status = "PASS" - report.status_extended = ( - f"EC2 instance {instance.id} has not a Public IP." - ) - report.resource_id = instance.id - findings.append(report) + for instance in ec2_client.instances: + report = Check_Report(self.metadata) + report.region = instance.region + if instance.public_ip: + report.status = "FAIL" + report.status_extended = f"EC2 instance {instance.id} has a Public IP: {instance.public_ip} ({instance.public_dns})." + report.resource_id = instance.id else: - report = Check_Report(self.metadata) report.status = "PASS" - report.status_extended = "There are no EC2 instances." - report.region = region - - findings.append(report) + report.status_extended = ( + f"EC2 instance {instance.id} has not a Public IP." + ) + report.resource_id = instance.id + findings.append(report) return findings diff --git a/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_22/ec2_networkacl_allow_ingress_tcp_port_22.py b/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_22/ec2_networkacl_allow_ingress_tcp_port_22.py index dba8f677..06d65b99 100644 --- a/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_22/ec2_networkacl_allow_ingress_tcp_port_22.py +++ b/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_22/ec2_networkacl_allow_ingress_tcp_port_22.py @@ -6,40 +6,32 @@ class ec2_networkacl_allow_ingress_tcp_port_22(Check): def execute(self): findings = [] check_port = 22 - for regional_client in ec2_client.regional_clients: - region = regional_client.region - if regional_client.network_acls: - for network_acl in regional_client.network_acls: - public = False - report = Check_Report(self.metadata) - report.region = region - for entry in network_acl.entries: - if ( - entry["CidrBlock"] == "0.0.0.0/0" - and entry["RuleAction"] == "allow" - and not entry["Egress"] - and "PortRange" in entry - and entry["Protocol"] == "6" # 6 relates to tcp protocol - ): - if ( - entry["PortRange"]["From"] == check_port - and entry["PortRange"]["To"] == check_port - ): - public = True - report.status = "FAIL" - report.status_extended = f"Network ACL {network_acl.id} has SSH port 22 open to the Internet." - report.resource_id = network_acl.id - if not public: - report.status = "PASS" - report.status_extended = f"Network ACL {network_acl.id} has not SSH port 22 open to the Internet." - report.resource_id = network_acl.id - findings.append(report) - else: - report = Check_Report(self.metadata) + for network_acl in ec2_client.network_acls: + public = False + report = Check_Report(self.metadata) + report.region = network_acl.region + for entry in network_acl.entries: + if ( + entry["CidrBlock"] == "0.0.0.0/0" + and entry["RuleAction"] == "allow" + and not entry["Egress"] + ): + if entry["Protocol"] == "-1": + public = True + elif ( + entry["PortRange"]["From"] == check_port + and entry["PortRange"]["To"] == check_port + and entry["Protocol"] == "6" + ): + public = True + if not public: report.status = "PASS" - report.status_extended = "There are no EC2 network acls." - report.region = region - - findings.append(report) + report.status_extended = f"Network ACL {network_acl.id} has not SSH port 22 open to the Internet." + report.resource_id = network_acl.id + else: + report.status = "FAIL" + report.status_extended = f"Network ACL {network_acl.id} has SSH port 22 open to the Internet." + report.resource_id = network_acl.id + findings.append(report) return findings diff --git a/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_3389/ec2_networkacl_allow_ingress_tcp_port_3389.py b/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_3389/ec2_networkacl_allow_ingress_tcp_port_3389.py index f2bdfb0c..34863ba5 100644 --- a/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_3389/ec2_networkacl_allow_ingress_tcp_port_3389.py +++ b/providers/aws/services/ec2/ec2_networkacl_allow_ingress_tcp_port_3389/ec2_networkacl_allow_ingress_tcp_port_3389.py @@ -6,40 +6,32 @@ class ec2_networkacl_allow_ingress_tcp_port_3389(Check): def execute(self): findings = [] check_port = 3389 - for regional_client in ec2_client.regional_clients: - region = regional_client.region - if regional_client.network_acls: - for network_acl in regional_client.network_acls: - public = False - report = Check_Report(self.metadata) - report.region = region - for entry in network_acl.entries: - if ( - entry["CidrBlock"] == "0.0.0.0/0" - and entry["RuleAction"] == "allow" - and not entry["Egress"] - and "PortRange" in entry - and entry["Protocol"] == "6" # 6 relates to tcp protocol - ): - if ( - entry["PortRange"]["From"] == check_port - and entry["PortRange"]["To"] == check_port - ): - public = True - report.status = "FAIL" - report.status_extended = f"Network ACL {network_acl.id} has Microsoft RDP port 3389 open to the Internet." - report.resource_id = network_acl.id - if not public: - report.status = "PASS" - report.status_extended = f"Network ACL {network_acl.id} has not Microsoft RDP port 3389 open to the Internet." - report.resource_id = network_acl.id - findings.append(report) - else: - report = Check_Report(self.metadata) + for network_acl in ec2_client.network_acls: + public = False + report = Check_Report(self.metadata) + report.region = network_acl.region + for entry in network_acl.entries: + if ( + entry["CidrBlock"] == "0.0.0.0/0" + and entry["RuleAction"] == "allow" + and not entry["Egress"] + ): + if entry["Protocol"] == "-1": + public = True + elif ( + entry["PortRange"]["From"] == check_port + and entry["PortRange"]["To"] == check_port + and entry["Protocol"] == "6" + ): + public = True + if not public: report.status = "PASS" - report.status_extended = "There are no EC2 network acls." - report.region = region - - findings.append(report) + report.status_extended = f"Network ACL {network_acl.id} has not Microsoft RDP port 3389 open to the Internet." + report.resource_id = network_acl.id + else: + report.status = "FAIL" + report.status_extended = f"Network ACL {network_acl.id} has Microsoft RDP port 3389 open to the Internet." + report.resource_id = network_acl.id + findings.append(report) return findings diff --git a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_any_port/ec2_securitygroup_allow_ingress_from_internet_to_any_port.py b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_any_port/ec2_securitygroup_allow_ingress_from_internet_to_any_port.py index c542fb84..d5721bd7 100644 --- a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_any_port/ec2_securitygroup_allow_ingress_from_internet_to_any_port.py +++ b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_any_port/ec2_securitygroup_allow_ingress_from_internet_to_any_port.py @@ -5,32 +5,22 @@ from providers.aws.services.ec2.ec2_service import check_security_group, ec2_cli class ec2_securitygroup_allow_ingress_from_internet_to_any_port(Check): def execute(self): findings = [] - for regional_client in ec2_client.regional_clients: - region = regional_client.region - if regional_client.security_groups: - for security_group in regional_client.security_groups: - public = False - report = Check_Report(self.metadata) - report.region = region - # Loop through every security group's ingress rule and check it - for ingress_rule in security_group.ingress_rules: - public = check_security_group(ingress_rule, "-1") - # Check - if public: - report.status = "FAIL" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has all ports open to the Internet." - report.resource_id = security_group.id - else: - report.status = "PASS" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not all ports open to the Internet." - report.resource_id = security_group.id - findings.append(report) - else: - report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There are no EC2 security groups." - report.region = region - + for security_group in ec2_client.security_groups: + public = False + report = Check_Report(self.metadata) + report.region = security_group.region + # Loop through every security group's ingress rule and check it + for ingress_rule in security_group.ingress_rules: + public = check_security_group(ingress_rule, "-1") + # Check + if public: + report.status = "FAIL" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has all ports open to the Internet." + report.resource_id = security_group.id + else: + report.status = "PASS" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not all ports open to the Internet." + report.resource_id = security_group.id findings.append(report) return findings diff --git a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22.py b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22.py index 4fc4d7c6..440af5e7 100644 --- a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22.py +++ b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22.py @@ -6,32 +6,22 @@ class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22(Check): def execute(self): findings = [] check_ports = [22] - for regional_client in ec2_client.regional_clients: - region = regional_client.region - if regional_client.security_groups: - for security_group in regional_client.security_groups: - public = False - report = Check_Report(self.metadata) - report.region = region - # Loop through every security group's ingress rule and check it - for ingress_rule in security_group.ingress_rules: - public = check_security_group(ingress_rule, "tcp", check_ports) - # Check - if public: - report.status = "FAIL" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the SSH port 22 open to the Internet." - report.resource_id = security_group.id - else: - report.status = "PASS" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not SSH port 22 open to the Internet." - report.resource_id = security_group.id - findings.append(report) - else: - report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There are no EC2 security groups." - report.region = region - + for security_group in ec2_client.security_groups: + public = False + report = Check_Report(self.metadata) + report.region = security_group.region + # Loop through every security group's ingress rule and check it + for ingress_rule in security_group.ingress_rules: + public = check_security_group(ingress_rule, "tcp", check_ports) + # Check + if public: + report.status = "FAIL" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the SSH port 22 open to the Internet." + report.resource_id = security_group.id + else: + report.status = "PASS" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not SSH port 22 open to the Internet." + report.resource_id = security_group.id findings.append(report) return findings diff --git a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389.py b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389.py index 7d3f7463..110049f3 100644 --- a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389.py +++ b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389.py @@ -1,37 +1,26 @@ from lib.check.models import Check, Check_Report from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client - class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389(Check): def execute(self): findings = [] check_ports = [3389] - for regional_client in ec2_client.regional_clients: - region = regional_client.region - if regional_client.security_groups: - for security_group in regional_client.security_groups: - public = False - report = Check_Report(self.metadata) - report.region = region - # Loop through every security group's ingress rule and check it - for ingress_rule in security_group.ingress_rules: - public = check_security_group(ingress_rule, "tcp", check_ports) - # Check - if public: - report.status = "FAIL" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet." - report.resource_id = security_group.id - else: - report.status = "PASS" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet." - report.resource_id = security_group.id - findings.append(report) - else: - report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There are no EC2 security groups." - report.region = region - + for security_group in ec2_client.security_groups: + public = False + report = Check_Report(self.metadata) + report.region = security_group.region + # Loop through every security group's ingress rule and check it + for ingress_rule in security_group.ingress_rules: + public = check_security_group(ingress_rule, "tcp", check_ports) + # Check + if public: + report.status = "FAIL" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has Microsoft RDP port 3389 open to the Internet." + report.resource_id = security_group.id + else: + report.status = "PASS" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet." + report.resource_id = security_group.id findings.append(report) return findings diff --git a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306.py b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306.py index dbca655a..3115bca8 100644 --- a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306.py +++ b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306.py @@ -1,37 +1,26 @@ from lib.check.models import Check, Check_Report from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client - class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306(Check): def execute(self): findings = [] check_ports = [3306] - for regional_client in ec2_client.regional_clients: - region = regional_client.region - if regional_client.security_groups: - for security_group in regional_client.security_groups: - public = False - report = Check_Report(self.metadata) - report.region = region - # Loop through every security group's ingress rule and check it - for ingress_rule in security_group.ingress_rules: - public = check_security_group(ingress_rule, "tcp", check_ports) - # Check - if public: - report.status = "FAIL" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the MySQL port 3306 open to the Internet." - report.resource_id = security_group.id - else: - report.status = "PASS" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MySQL port 3306 open to the Internet." - report.resource_id = security_group.id - findings.append(report) - else: - report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There are no EC2 security groups." - report.region = region - + for security_group in ec2_client.security_groups: + public = False + report = Check_Report(self.metadata) + report.region = security_group.region + # Loop through every security group's ingress rule and check it + for ingress_rule in security_group.ingress_rules: + public = check_security_group(ingress_rule, "tcp", check_ports) + # Check + if public: + report.status = "FAIL" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the MySQL port 3306 open to the Internet." + report.resource_id = security_group.id + else: + report.status = "PASS" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MySQL port 3306 open to the Internet." + report.resource_id = security_group.id findings.append(report) return findings diff --git a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483.py b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483.py index 96e5e17a..8a6aa5ca 100644 --- a/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483.py +++ b/providers/aws/services/ec2/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483/ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483.py @@ -1,37 +1,26 @@ from lib.check.models import Check, Check_Report from providers.aws.services.ec2.ec2_service import check_security_group, ec2_client - class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483(Check): def execute(self): findings = [] check_ports = [1521, 2483] - for regional_client in ec2_client.regional_clients: - region = regional_client.region - if regional_client.security_groups: - for security_group in regional_client.security_groups: - public = False - report = Check_Report(self.metadata) - report.region = region - # Loop through every security group's ingress rule and check it - for ingress_rule in security_group.ingress_rules: - public = check_security_group(ingress_rule, "tcp", check_ports) - # Check - if public: - report.status = "FAIL" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has Oracle ports 1521 and 2483 open to the Internet." - report.resource_id = security_group.id - else: - report.status = "PASS" - report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Oracle ports 1521 and 2483 open to the Internet." - report.resource_id = security_group.id - findings.append(report) - else: - report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There are no EC2 security groups." - report.region = region - + for security_group in ec2_client.security_groups: + public = False + report = Check_Report(self.metadata) + report.region = security_group.region + # Loop through every security group's ingress rule and check it + for ingress_rule in security_group.ingress_rules: + public = check_security_group(ingress_rule, "tcp", check_ports) + # Check + if public: + report.status = "FAIL" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has Oracle ports 1521 and 2483 open to the Internet." + report.resource_id = security_group.id + else: + report.status = "PASS" + report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Oracle ports 1521 and 2483 open to the Internet." + report.resource_id = security_group.id findings.append(report) - return findings + return findings \ No newline at end of file diff --git a/providers/aws/services/ec2/ec2_service.py b/providers/aws/services/ec2/ec2_service.py index be0beead..81d45e65 100644 --- a/providers/aws/services/ec2/ec2_service.py +++ b/providers/aws/services/ec2/ec2_service.py @@ -13,18 +13,22 @@ class EC2: self.session = audit_info.audit_session self.audited_account = audit_info.audited_account self.regional_clients = generate_regional_clients(self.service, audit_info) + self.instances = [] self.__threading_call__(self.__describe_instances__) + self.security_groups = [] self.__threading_call__(self.__describe_security_groups__) + self.network_acls = [] self.__threading_call__(self.__describe_network_acls__) + self.snapshots = [] self.__threading_call__(self.__describe_snapshots__) - self.__threading_call__(self.__get_snapshot_public__) + self.__get_snapshot_public__() def __get_session__(self): return self.session def __threading_call__(self, call): threads = [] - for regional_client in self.regional_clients: + for regional_client in self.regional_clients.values(): threads.append(threading.Thread(target=call, args=(regional_client,))) for t in threads: t.start() @@ -37,7 +41,6 @@ class EC2: describe_instances_paginator = regional_client.get_paginator( "describe_instances" ) - instances = [] for page in describe_instances_paginator.paginate(): for reservation in page["Reservations"]: for instance in reservation["Instances"]: @@ -45,9 +48,10 @@ class EC2: "PublicDnsName" in instance and "PublicIpAddress" in instance ): - instances.append( + self.instances.append( Instance( instance["InstanceId"], + regional_client.region, instance["InstanceType"], instance["ImageId"], instance["LaunchTime"], @@ -58,9 +62,10 @@ class EC2: ) ) else: - instances.append( + self.instances.append( Instance( instance["InstanceId"], + regional_client.region, instance["InstanceType"], instance["ImageId"], instance["LaunchTime"], @@ -74,9 +79,6 @@ class EC2: logger.error( f"{regional_client.region} -- {error.__class__.__name__}: {error}" ) - regional_client.instances = [] - else: - regional_client.instances = instances def __describe_security_groups__(self, regional_client): logger.info("EC2 - Describing Security Groups...") @@ -84,12 +86,12 @@ class EC2: describe_security_groups_paginator = regional_client.get_paginator( "describe_security_groups" ) - security_groups = [] for page in describe_security_groups_paginator.paginate(): for sg in page["SecurityGroups"]: - security_groups.append( + self.security_groups.append( SecurityGroup( sg["GroupName"], + regional_client.region, sg["GroupId"], sg["IpPermissions"], sg["IpPermissionsEgress"], @@ -99,9 +101,6 @@ class EC2: logger.error( f"{regional_client.region} -- {error.__class__.__name__}: {error}" ) - regional_client.security_groups = [] - else: - regional_client.security_groups = security_groups def __describe_network_acls__(self, regional_client): logger.info("EC2 - Describing Security Groups...") @@ -109,19 +108,19 @@ class EC2: describe_network_acls_paginator = regional_client.get_paginator( "describe_network_acls" ) - network_acls = [] for page in describe_network_acls_paginator.paginate(): for nacl in page["NetworkAcls"]: - network_acls.append( - NetworkACL(nacl["NetworkAclId"], nacl["Entries"]) + self.network_acls.append( + NetworkACL( + nacl["NetworkAclId"], + regional_client.region, + nacl["Entries"], + ) ) except Exception as error: logger.error( f"{regional_client.region} -- {error.__class__.__name__}: {error}" ) - regional_client.network_acls = [] - else: - regional_client.network_acls = network_acls def __describe_snapshots__(self, regional_client): logger.info("EC2 - Describing Snapshots...") @@ -129,7 +128,6 @@ class EC2: describe_snapshots_paginator = regional_client.get_paginator( "describe_snapshots" ) - snapshots = [] encrypted = False for page in describe_snapshots_paginator.paginate( OwnerIds=[self.audited_account] @@ -137,36 +135,36 @@ class EC2: for snapshot in page["Snapshots"]: if snapshot["Encrypted"]: encrypted = True - snapshots.append(Snapshot(snapshot["SnapshotId"], encrypted)) + self.snapshots.append( + Snapshot( + snapshot["SnapshotId"], regional_client.region, encrypted + ) + ) except Exception as error: logger.error( f"{regional_client.region} -- {error.__class__.__name__}: {error}" ) - regional_client.snapshots = [] - else: - regional_client.snapshots = snapshots - def __get_snapshot_public__(self, regional_client): + def __get_snapshot_public__(self): logger.info("EC2 - Get snapshots encryption...") try: - if hasattr(regional_client, "snapshots"): - for snapshot in regional_client.snapshots: - snapshot_public = regional_client.describe_snapshot_attribute( - Attribute="createVolumePermission", SnapshotId=snapshot.id - ) - for permission in snapshot_public["CreateVolumePermissions"]: - if "Group" in permission: - if permission["Group"] == "all": - snapshot.public = True + for snapshot in self.snapshots: + regional_client = self.regional_clients[snapshot.region] + snapshot_public = regional_client.describe_snapshot_attribute( + Attribute="createVolumePermission", SnapshotId=snapshot.id + ) + for permission in snapshot_public["CreateVolumePermissions"]: + if "Group" in permission: + if permission["Group"] == "all": + snapshot.public = True except Exception as error: - logger.error( - f"{regional_client.region} -- {error.__class__.__name__}: {error}" - ) + logger.error(f"{error.__class__.__name__}: {error}") @dataclass class Instance: id: str + region: str type: str image_id: str launch_time: str @@ -178,6 +176,7 @@ class Instance: def __init__( self, id, + region, type, image_id, launch_time, @@ -187,6 +186,7 @@ class Instance: public_ip, ): self.id = id + self.region = region self.type = type self.image_id = image_id self.launch_time = launch_time @@ -199,11 +199,13 @@ class Instance: @dataclass class Snapshot: id: str + region: str encrypted: bool public: bool - def __init__(self, id, encrypted): + def __init__(self, id, region, encrypted): self.id = id + self.region = region self.encrypted = encrypted self.public = False @@ -211,12 +213,14 @@ class Snapshot: @dataclass class SecurityGroup: name: str + region: str id: str ingress_rules: list[dict] egress_rules: list[dict] - def __init__(self, name, id, ingress_rules, egress_rules): + def __init__(self, name, region, id, ingress_rules, egress_rules): self.name = name + self.region = region self.id = id self.ingress_rules = ingress_rules self.egress_rules = egress_rules @@ -227,8 +231,9 @@ class NetworkACL: id: str entries: list[dict] - def __init__(self, id, entries): + def __init__(self, id, region, entries): self.id = id + self.region = region self.entries = entries diff --git a/providers/aws/services/iam/iam_administrator_access_with_mfa/iam_administrator_access_with_mfa.py b/providers/aws/services/iam/iam_administrator_access_with_mfa/iam_administrator_access_with_mfa.py index cdcb9d98..f02b8b26 100644 --- a/providers/aws/services/iam/iam_administrator_access_with_mfa/iam_administrator_access_with_mfa.py +++ b/providers/aws/services/iam/iam_administrator_access_with_mfa/iam_administrator_access_with_mfa.py @@ -7,58 +7,50 @@ class iam_administrator_access_with_mfa(Check): findings = [] response = iam_client.groups - if response: - for group in response: - report = Check_Report(self.metadata) - report.resource_id = group.name - report.resource_arn = group.arn - report.region = iam_client.region - if group.attached_policies: - admin_policy = False - for group_policy in group.attached_policies: - if ( - group_policy["PolicyArn"] - == "arn:aws:iam::aws:policy/AdministratorAccess" - ): - admin_policy = True - # users in group are Administrators - if group.users: - for group_user in group.users: - for user in iam_client.credential_report: - if ( - user["user"] == group_user.name - and user["mfa_active"] == "false" - ): - report.status = "FAIL" - report.status_extended = f"Group {group.name} provides administrator access to User {group_user.name} with MFA disabled." - findings.append(report) - elif ( - user["user"] == group_user.name - and user["mfa_active"] == "true" - ): - report.status = "PASS" - report.status_extended = f"Group {group.name} provides administrator access to User {group_user.name} with MFA enabled." - findings.append(report) - else: - report.status = "PASS" - report.status_extended = f"Group {group.name} provides administrative access but does not have users." - findings.append(report) - if not admin_policy: - report.status = "PASS" - report.status_extended = ( - f"Group {group.name} provides non-administrative access." - ) - findings.append(report) - else: - report.status = "PASS" - report.status_extended = f"Group {group.name} has no policies." - findings.append(report) - - else: + for group in response: report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There is no IAM groups." + report.resource_id = group.name + report.resource_arn = group.arn report.region = iam_client.region - findings.append(report) + if group.attached_policies: + admin_policy = False + for group_policy in group.attached_policies: + if ( + group_policy["PolicyArn"] + == "arn:aws:iam::aws:policy/AdministratorAccess" + ): + admin_policy = True + # users in group are Administrators + if group.users: + for group_user in group.users: + for user in iam_client.credential_report: + if ( + user["user"] == group_user.name + and user["mfa_active"] == "false" + ): + report.status = "FAIL" + report.status_extended = f"Group {group.name} provides administrator access to User {group_user.name} with MFA disabled." + findings.append(report) + elif ( + user["user"] == group_user.name + and user["mfa_active"] == "true" + ): + report.status = "PASS" + report.status_extended = f"Group {group.name} provides administrator access to User {group_user.name} with MFA enabled." + findings.append(report) + else: + report.status = "PASS" + report.status_extended = f"Group {group.name} provides administrative access but does not have users." + findings.append(report) + if not admin_policy: + report.status = "PASS" + report.status_extended = ( + f"Group {group.name} provides non-administrative access." + ) + findings.append(report) + else: + report.status = "PASS" + report.status_extended = f"Group {group.name} has no policies." + findings.append(report) return findings diff --git a/providers/aws/services/iam/iam_avoid_root_usage/iam_avoid_root_usage.py b/providers/aws/services/iam/iam_avoid_root_usage/iam_avoid_root_usage.py index b8d1a62a..23fc9997 100644 --- a/providers/aws/services/iam/iam_avoid_root_usage/iam_avoid_root_usage.py +++ b/providers/aws/services/iam/iam_avoid_root_usage/iam_avoid_root_usage.py @@ -11,51 +11,50 @@ class iam_avoid_root_usage(Check): findings = [] response = iam_client.credential_report - if response: - for user in response: - if user["user"] == "": - report = Check_Report(self.metadata) - report.region = iam_client.region - report.resource_id = user["user"] - report.resource_arn = user["arn"] - if ( - user["password_last_used"] != "no_information" - or user["access_key_1_last_used_date"] != "N/A" - or user["access_key_2_last_used_date"] != "N/A" - ): - if user["password_last_used"] != "no_information": - days_since_accessed = ( - datetime.datetime.now() - - datetime.datetime.strptime( - user["password_last_used"], - "%Y-%m-%dT%H:%M:%S+00:00", - ) - ).days - elif user["access_key_1_last_used_date"] != "N/A": - days_since_accessed = ( - datetime.datetime.now() - - datetime.datetime.strptime( - user["access_key_1_last_used_date"], - "%Y-%m-%dT%H:%M:%S+00:00", - ) - ).days - elif user["access_key_2_last_used_date"] != "N/A": - days_since_accessed = ( - datetime.datetime.now() - - datetime.datetime.strptime( - user["access_key_2_last_used_date"], - "%Y-%m-%dT%H:%M:%S+00:00", - ) - ).days - if days_since_accessed > maximum_access_days: - report.status = "FAIL" - report.status_extended = f"Root user in the account was last accessed {days_since_accessed} days ago." - else: - report.status = "PASS" - report.status_extended = f"Root user in the account wasn't accessed in the last {maximum_access_days} days." + for user in response: + if user["user"] == "": + report = Check_Report(self.metadata) + report.region = iam_client.region + report.resource_id = user["user"] + report.resource_arn = user["arn"] + if ( + user["password_last_used"] != "no_information" + or user["access_key_1_last_used_date"] != "N/A" + or user["access_key_2_last_used_date"] != "N/A" + ): + if user["password_last_used"] != "no_information": + days_since_accessed = ( + datetime.datetime.now() + - datetime.datetime.strptime( + user["password_last_used"], + "%Y-%m-%dT%H:%M:%S+00:00", + ) + ).days + elif user["access_key_1_last_used_date"] != "N/A": + days_since_accessed = ( + datetime.datetime.now() + - datetime.datetime.strptime( + user["access_key_1_last_used_date"], + "%Y-%m-%dT%H:%M:%S+00:00", + ) + ).days + elif user["access_key_2_last_used_date"] != "N/A": + days_since_accessed = ( + datetime.datetime.now() + - datetime.datetime.strptime( + user["access_key_2_last_used_date"], + "%Y-%m-%dT%H:%M:%S+00:00", + ) + ).days + if days_since_accessed > maximum_access_days: + report.status = "FAIL" + report.status_extended = f"Root user in the account was last accessed {days_since_accessed} days ago." else: report.status = "PASS" report.status_extended = f"Root user in the account wasn't accessed in the last {maximum_access_days} days." - findings.append(report) + else: + report.status = "PASS" + report.status_extended = f"Root user in the account wasn't accessed in the last {maximum_access_days} days." + findings.append(report) return findings diff --git a/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py b/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py index 6f5424e5..11e13d37 100644 --- a/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py +++ b/providers/aws/services/iam/iam_disable_30_days_credentials/iam_disable_30_days_credentials.py @@ -11,42 +11,35 @@ class iam_disable_30_days_credentials(Check): findings = [] response = iam_client.users - if response: - for user in response: - report = Check_Report(self.metadata) - report.resource_id = user.name - report.resource_arn = user.arn - report.region = iam_client.region - if user.password_last_used and user.password_last_used != "": - try: - time_since_insertion = ( - datetime.datetime.now() - - datetime.datetime.strptime( - str(user.password_last_used), "%Y-%m-%d %H:%M:%S+00:00" - ) - ) - if time_since_insertion.days > maximum_expiration_days: - report.status = "FAIL" - report.status_extended = f"User {user.name} has not logged into the console in the past 30 days." - else: - report.status = "PASS" - report.status_extended = f"User {user.name} has logged into the console in the past 30 days." - - except KeyError: - pass - else: - report.status = "PASS" - report.status_extended = ( - f"User {user.name} has not a console password or is unused." - ) - - # Append report - findings.append(report) - else: + for user in response: report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There is no IAM users." + report.resource_id = user.name + report.resource_arn = user.arn report.region = iam_client.region + if user.password_last_used and user.password_last_used != "": + try: + time_since_insertion = ( + datetime.datetime.now() + - datetime.datetime.strptime( + str(user.password_last_used), "%Y-%m-%d %H:%M:%S+00:00" + ) + ) + if time_since_insertion.days > maximum_expiration_days: + report.status = "FAIL" + report.status_extended = f"User {user.name} has not logged into the console in the past 30 days." + else: + report.status = "PASS" + report.status_extended = f"User {user.name} has logged into the console in the past 30 days." + + except KeyError: + pass + else: + report.status = "PASS" + report.status_extended = ( + f"User {user.name} has not a console password or is unused." + ) + + # Append report findings.append(report) return findings diff --git a/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.py b/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.py index 9c9c5f81..ac023145 100644 --- a/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.py +++ b/providers/aws/services/iam/iam_disable_90_days_credentials/iam_disable_90_days_credentials.py @@ -11,42 +11,35 @@ class iam_disable_90_days_credentials(Check): findings = [] response = iam_client.users - if response: - for user in response: - report = Check_Report(self.metadata) - report.region = iam_client.region - report.resource_id = user.name - report.resource_arn = user.arn - if user.password_last_used and user.password_last_used != "": - try: - time_since_insertion = ( - datetime.datetime.now() - - datetime.datetime.strptime( - str(user.password_last_used), "%Y-%m-%d %H:%M:%S+00:00" - ) - ) - if time_since_insertion.days > maximum_expiration_days: - report.status = "FAIL" - report.status_extended = f"User {user.name} has not logged into the console in the past 90 days." - else: - report.status = "PASS" - report.status_extended = f"User {user.name} has logged into the console in the past 90 days." - - except KeyError: - pass - else: - report.status = "PASS" - - report.status_extended = ( - f"User {user.name} has not a console password or is unused." - ) - # Append report - findings.append(report) - else: + for user in response: report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There is no IAM users." report.region = iam_client.region + report.resource_id = user.name + report.resource_arn = user.arn + if user.password_last_used and user.password_last_used != "": + try: + time_since_insertion = ( + datetime.datetime.now() + - datetime.datetime.strptime( + str(user.password_last_used), "%Y-%m-%d %H:%M:%S+00:00" + ) + ) + if time_since_insertion.days > maximum_expiration_days: + report.status = "FAIL" + report.status_extended = f"User {user.name} has not logged into the console in the past 90 days." + else: + report.status = "PASS" + report.status_extended = f"User {user.name} has logged into the console in the past 90 days." + + except KeyError: + pass + else: + report.status = "PASS" + + report.status_extended = ( + f"User {user.name} has not a console password or is unused." + ) + # Append report findings.append(report) return findings diff --git a/providers/aws/services/iam/iam_root_mfa_enabled/iam_root_mfa_enabled.py b/providers/aws/services/iam/iam_root_mfa_enabled/iam_root_mfa_enabled.py index 143821e3..a9aaea5b 100644 --- a/providers/aws/services/iam/iam_root_mfa_enabled/iam_root_mfa_enabled.py +++ b/providers/aws/services/iam/iam_root_mfa_enabled/iam_root_mfa_enabled.py @@ -6,19 +6,18 @@ class iam_root_mfa_enabled(Check): def execute(self) -> Check_Report: findings = [] - if iam_client.credential_report: - for user in iam_client.credential_report: - if user["user"] == "": - report = Check_Report(self.metadata) - report.region = iam_client.region - report.resource_id = user["user"] - report.resource_arn = user["arn"] - if user["mfa_active"] == "false": - report.status = "FAIL" - report.status_extended = "MFA is not enabled for root account." - else: - report.status = "PASS" - report.status_extended = "MFA is enabled for root account." - findings.append(report) + for user in iam_client.credential_report: + if user["user"] == "": + report = Check_Report(self.metadata) + report.region = iam_client.region + report.resource_id = user["user"] + report.resource_arn = user["arn"] + if user["mfa_active"] == "false": + report.status = "FAIL" + report.status_extended = "MFA is not enabled for root account." + else: + report.status = "PASS" + report.status_extended = "MFA is enabled for root account." + findings.append(report) return findings diff --git a/providers/aws/services/iam/iam_rotate_access_key_90_days/iam_rotate_access_key_90_days.py b/providers/aws/services/iam/iam_rotate_access_key_90_days/iam_rotate_access_key_90_days.py index 2d859232..0f87e372 100644 --- a/providers/aws/services/iam/iam_rotate_access_key_90_days/iam_rotate_access_key_90_days.py +++ b/providers/aws/services/iam/iam_rotate_access_key_90_days/iam_rotate_access_key_90_days.py @@ -11,53 +11,48 @@ class iam_rotate_access_key_90_days(Check): findings = [] response = iam_client.credential_report - if response: - for user in response: - report = Check_Report(self.metadata) - report.region = iam_client.region - report.resource_id = user["user"] - report.resource_arn = user["arn"] - if ( - user["access_key_1_last_rotated"] == "N/A" - and user["access_key_2_last_rotated"] == "N/A" - ): - report.status = "PASS" - report.status_extended = f"User {user['user']} has not access keys." - else: - old_access_keys = False - if user["access_key_1_last_rotated"] != "N/A": - access_key_1_last_rotated = ( - datetime.datetime.now() - - datetime.datetime.strptime( - user["access_key_1_last_rotated"], - "%Y-%m-%dT%H:%M:%S+00:00", - ) - ) - if access_key_1_last_rotated.days > maximum_expiration_days: - old_access_keys = True - report.status = "FAIL" - report.status_extended = f"User {user['user']} has not rotated access key 1 in over 90 days ({access_key_1_last_rotated.days} days)." - if user["access_key_2_last_rotated"] != "N/A": - access_key_2_last_rotated = ( - datetime.datetime.now() - - datetime.datetime.strptime( - user["access_key_2_last_rotated"], - "%Y-%m-%dT%H:%M:%S+00:00", - ) - ) - if access_key_2_last_rotated.days > maximum_expiration_days: - old_access_keys = True - report.status = "FAIL" - report.status_extended = f"User {user['user']} has not rotated access key 2 in over 90 days ({access_key_2_last_rotated.days} days)." - if not old_access_keys: - report.status = "PASS" - report.status_extended = f"User {user['user']} has access keys not older than 90 days." - findings.append(report) - else: + for user in response: report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There is no IAM users." report.region = iam_client.region + report.resource_id = user["user"] + report.resource_arn = user["arn"] + if ( + user["access_key_1_last_rotated"] == "N/A" + and user["access_key_2_last_rotated"] == "N/A" + ): + report.status = "PASS" + report.status_extended = f"User {user['user']} has not access keys." + else: + old_access_keys = False + if user["access_key_1_last_rotated"] != "N/A": + access_key_1_last_rotated = ( + datetime.datetime.now() + - datetime.datetime.strptime( + user["access_key_1_last_rotated"], + "%Y-%m-%dT%H:%M:%S+00:00", + ) + ) + if access_key_1_last_rotated.days > maximum_expiration_days: + old_access_keys = True + report.status = "FAIL" + report.status_extended = f"User {user['user']} has not rotated access key 1 in over 90 days ({access_key_1_last_rotated.days} days)." + if user["access_key_2_last_rotated"] != "N/A": + access_key_2_last_rotated = ( + datetime.datetime.now() + - datetime.datetime.strptime( + user["access_key_2_last_rotated"], + "%Y-%m-%dT%H:%M:%S+00:00", + ) + ) + if access_key_2_last_rotated.days > maximum_expiration_days: + old_access_keys = True + report.status = "FAIL" + report.status_extended = f"User {user['user']} has not rotated access key 2 in over 90 days ({access_key_2_last_rotated.days} days)." + if not old_access_keys: + report.status = "PASS" + report.status_extended = ( + f"User {user['user']} has access keys not older than 90 days." + ) findings.append(report) return findings diff --git a/providers/aws/services/iam/iam_user_hardware_mfa_enabled/iam_user_hardware_mfa_enabled.py b/providers/aws/services/iam/iam_user_hardware_mfa_enabled/iam_user_hardware_mfa_enabled.py index ec5332cb..92d45268 100644 --- a/providers/aws/services/iam/iam_user_hardware_mfa_enabled/iam_user_hardware_mfa_enabled.py +++ b/providers/aws/services/iam/iam_user_hardware_mfa_enabled/iam_user_hardware_mfa_enabled.py @@ -7,35 +7,28 @@ class iam_user_hardware_mfa_enabled(Check): findings = [] response = iam_client.users - if response: - for user in response: - report = Check_Report(self.metadata) - report.resource_id = user.name - report.resource_arn = user.arn - report.region = iam_client.region - if user.mfa_devices: - for mfa_device in user.mfa_devices: - if mfa_device.type == "mfa" or mfa_device.type == "sms-mfa": - report.status = "FAIL" - report.status_extended = f"User {user.name} has a virtual MFA instead of a hardware MFA enabled." - findings.append(report) - else: - report.status = "PASS" - report.status_extended = ( - f"User {user.name} has hardware MFA enabled." - ) - findings.append(report) - else: - report.status = "FAIL" - report.status_extended = ( - f"User {user.name} has not any type of MFA enabled." - ) - findings.append(report) - else: + for user in response: report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There is no IAM users." + report.resource_id = user.name + report.resource_arn = user.arn report.region = iam_client.region - findings.append(report) + if user.mfa_devices: + for mfa_device in user.mfa_devices: + if mfa_device.type == "mfa" or mfa_device.type == "sms-mfa": + report.status = "FAIL" + report.status_extended = f"User {user.name} has a virtual MFA instead of a hardware MFA enabled." + findings.append(report) + else: + report.status = "PASS" + report.status_extended = ( + f"User {user.name} has hardware MFA enabled." + ) + findings.append(report) + else: + report.status = "FAIL" + report.status_extended = ( + f"User {user.name} has not any type of MFA enabled." + ) + findings.append(report) return findings diff --git a/providers/aws/services/iam/iam_user_mfa_enabled_console_access/iam_user_mfa_enabled_console_access.py b/providers/aws/services/iam/iam_user_mfa_enabled_console_access/iam_user_mfa_enabled_console_access.py index 9d6e004f..e8e16527 100644 --- a/providers/aws/services/iam/iam_user_mfa_enabled_console_access/iam_user_mfa_enabled_console_access.py +++ b/providers/aws/services/iam/iam_user_mfa_enabled_console_access/iam_user_mfa_enabled_console_access.py @@ -6,31 +6,23 @@ class iam_user_mfa_enabled_console_access(Check): def execute(self) -> Check_Report: findings = [] response = iam_client.credential_report - - if response: - for user in response: - report = Check_Report(self.metadata) - report.resource_id = user["user"] - report.resource_arn = user["arn"] - report.region = iam_client.region - if user["password_enabled"] != "not_supported": - if user["mfa_active"] == "false": - report.status = "FAIL" - report.status_extended = f"User {user['user']} has Console Password enabled but MFA disabled." - else: - report.status = "PASS" - report.status_extended = f"User {user['user']} has Console Password enabled and MFA enabled." + for user in response: + report = Check_Report(self.metadata) + report.resource_id = user["user"] + report.resource_arn = user["arn"] + report.region = iam_client.region + if user["password_enabled"] != "not_supported": + if user["mfa_active"] == "false": + report.status = "FAIL" + report.status_extended = f"User {user['user']} has Console Password enabled but MFA disabled." else: report.status = "PASS" - report.status_extended = ( - f"User {user['user']} has not Console Password enabled." - ) - findings.append(report) - else: - report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There is no IAM users." - report.region = iam_client.region + report.status_extended = f"User {user['user']} has Console Password enabled and MFA enabled." + else: + report.status = "PASS" + report.status_extended = ( + f"User {user['user']} has not Console Password enabled." + ) findings.append(report) return findings diff --git a/providers/aws/services/iam/iam_user_two_active_access_key/iam_user_two_active_access_key.py b/providers/aws/services/iam/iam_user_two_active_access_key/iam_user_two_active_access_key.py index a86d86c1..b6e59917 100644 --- a/providers/aws/services/iam/iam_user_two_active_access_key/iam_user_two_active_access_key.py +++ b/providers/aws/services/iam/iam_user_two_active_access_key/iam_user_two_active_access_key.py @@ -6,33 +6,24 @@ class iam_user_two_active_access_key(Check): def execute(self) -> Check_Report: findings = [] response = iam_client.credential_report - - if response: - for user in response: - report = Check_Report(self.metadata) - report.resource_id = user["user"] - report.resource_arn = user["arn"] - report.region = iam_client.region - if ( - user["access_key_1_active"] == "true" - and user["access_key_2_active"] == "true" - ): - report.status = "FAIL" - report.status_extended = ( - f"User {user['user']} has 2 active access keys." - ) - findings.append(report) - else: - report.status = "PASS" - report.status_extended = ( - f"User {user['user']} has not 2 active access keys." - ) - findings.append(report) - else: + for user in response: report = Check_Report(self.metadata) - report.status = "PASS" - report.status_extended = "There is no IAM users." + report.resource_id = user["user"] + report.resource_arn = user["arn"] report.region = iam_client.region + if ( + user["access_key_1_active"] == "true" + and user["access_key_2_active"] == "true" + ): + report.status = "FAIL" + report.status_extended = ( + f"User {user['user']} has 2 active access keys." + ) + else: + report.status = "PASS" + report.status_extended = ( + f"User {user['user']} has not 2 active access keys." + ) findings.append(report) return findings diff --git a/providers/aws/services/s3/s3_bucket_object_versioning/s3_bucket_object_versioning.py b/providers/aws/services/s3/s3_bucket_object_versioning/s3_bucket_object_versioning.py index 17276cb4..1cfa5051 100644 --- a/providers/aws/services/s3/s3_bucket_object_versioning/s3_bucket_object_versioning.py +++ b/providers/aws/services/s3/s3_bucket_object_versioning/s3_bucket_object_versioning.py @@ -5,29 +5,20 @@ from providers.aws.services.s3.s3_service import s3_client class s3_bucket_object_versioning(Check): def execute(self): findings = [] - for regional_client in s3_client.regional_clients: - region = regional_client.region - if regional_client.buckets: - for bucket in regional_client.buckets: - report = Check_Report(self.metadata) - report.region = region - report.resource_id = bucket.name - if bucket.versioning: - report.status = "PASS" - report.status_extended = ( - f"S3 Bucket {bucket.name} has versioning enabled." - ) - else: - report.status = "FAIL" - report.status_extended = ( - f"S3 Bucket {bucket.name} has versioning disabled." - ) - findings.append(report) - else: - report = Check_Report(self.metadata) + for bucket in s3_client.buckets: + report = Check_Report(self.metadata) + report.region = bucket.region + report.resource_id = bucket.name + if bucket.versioning: report.status = "PASS" - report.status_extended = "There are no S3 buckets." - report.region = region - findings.append(report) + report.status_extended = ( + f"S3 Bucket {bucket.name} has versioning enabled." + ) + else: + report.status = "FAIL" + report.status_extended = ( + f"S3 Bucket {bucket.name} has versioning disabled." + ) + findings.append(report) return findings diff --git a/providers/aws/services/s3/s3_bucket_server_access_logging_enabled/s3_bucket_server_access_logging_enabled.py b/providers/aws/services/s3/s3_bucket_server_access_logging_enabled/s3_bucket_server_access_logging_enabled.py index c21a8480..f233c358 100644 --- a/providers/aws/services/s3/s3_bucket_server_access_logging_enabled/s3_bucket_server_access_logging_enabled.py +++ b/providers/aws/services/s3/s3_bucket_server_access_logging_enabled/s3_bucket_server_access_logging_enabled.py @@ -5,25 +5,20 @@ from providers.aws.services.s3.s3_service import s3_client class s3_bucket_server_access_logging_enabled(Check): def execute(self): findings = [] - for regional_client in s3_client.regional_clients: - region = regional_client.region - if regional_client.buckets: - for bucket in regional_client.buckets: - report = Check_Report(self.metadata) - report.region = region - report.resource_id = bucket.name - if bucket.logging: - report.status = "PASS" - report.status_extended = f"S3 Bucket {bucket.name} has server access logging enabled." - else: - report.status = "FAIL" - report.status_extended = f"S3 Bucket {bucket.name} has server access logging disabled." - findings.append(report) - else: - report = Check_Report(self.metadata) + for bucket in s3_client.buckets: + report = Check_Report(self.metadata) + report.region = bucket.region + report.resource_id = bucket.name + if bucket.logging: report.status = "PASS" - report.status_extended = "There are no S3 buckets." - report.region = region - findings.append(report) + report.status_extended = ( + f"S3 Bucket {bucket.name} has server access logging enabled." + ) + else: + report.status = "FAIL" + report.status_extended = ( + f"S3 Bucket {bucket.name} has server access logging disabled." + ) + findings.append(report) return findings diff --git a/providers/aws/services/s3/s3_service.py b/providers/aws/services/s3/s3_service.py index 7d5ebb80..d81b440c 100644 --- a/providers/aws/services/s3/s3_service.py +++ b/providers/aws/services/s3/s3_service.py @@ -10,9 +10,10 @@ class S3: def __init__(self, audit_info): self.service = "s3" self.session = audit_info.audit_session + self.client = self.session.client(self.service) self.audited_account = audit_info.audited_account self.regional_clients = generate_regional_clients(self.service, audit_info) - self.__threading_call__(self.__list_buckets__) + self.buckets = self.__list_buckets__() self.__threading_call__(self.__get_bucket_versioning__) self.__threading_call__(self.__get_bucket_logging__) @@ -21,64 +22,50 @@ class S3: def __threading_call__(self, call): threads = [] - for regional_client in self.regional_clients: - threads.append(threading.Thread(target=call, args=(regional_client,))) + for bucket in self.buckets: + threads.append(threading.Thread(target=call, args=(bucket,))) for t in threads: t.start() for t in threads: t.join() - def __list_buckets__(self, regional_client): + def __list_buckets__(self): logger.info("S3 - Listing buckets...") try: - list_buckets = regional_client.list_buckets() buckets = [] + list_buckets = self.client.list_buckets() for bucket in list_buckets["Buckets"]: - try: - bucket_region = regional_client.get_bucket_location( - Bucket=bucket["Name"] - )["LocationConstraint"] - if regional_client.region == bucket_region or ( - regional_client.region == "us-east-1" and not bucket_region - ): # If us-east-1, bucket_region is none - buckets.append(Bucket(bucket["Name"])) - except Exception as error: - if error.__class__.__name__ != "NoSuchBucket": - logger.error( - f"{regional_client.region} -- {error.__class__.__name__}: {error}" - ) - regional_client.buckets = buckets + bucket_region = self.client.get_bucket_location(Bucket=bucket["Name"])[ + "LocationConstraint" + ] + if not bucket_region: # If us-east-1, bucket_region is none + buckets.append(Bucket(bucket["Name"], "us-east-1")) + else: + buckets.append(Bucket(bucket["Name"], bucket_region)) + return buckets except Exception as error: - logger.error( - f"{regional_client.region} -- {error.__class__.__name__}: {error}" - ) + logger.error(f"{bucket_region} -- {error.__class__.__name__}: {error}") - def __get_bucket_versioning__(self, regional_client): + def __get_bucket_versioning__(self, bucket): logger.info("S3 - Get buckets versioning...") try: - if hasattr(regional_client, "buckets"): - for bucket in regional_client.buckets: - bucket_versioning = regional_client.get_bucket_versioning( - Bucket=bucket.name - ) - if "Status" in bucket_versioning: - if "Enabled" == bucket_versioning["Status"]: - bucket.versioning = True - except Exception as error: - logger.error( - f"{regional_client.region} -- {error.__class__.__name__}: {error}" + regional_client = self.regional_clients[bucket.region] + bucket_versioning = regional_client.get_bucket_versioning( + Bucket=bucket.name ) + if "Status" in bucket_versioning: + if "Enabled" == bucket_versioning["Status"]: + bucket.versioning = True + except Exception as error: + logger.error(f"{bucket.region} -- {error.__class__.__name__}: {error}") - def __get_bucket_logging__(self, regional_client): + def __get_bucket_logging__(self, bucket): logger.info("S3 - Get buckets logging...") try: - if hasattr(regional_client, "buckets"): - for bucket in regional_client.buckets: - bucket_logging = regional_client.get_bucket_logging( - Bucket=bucket.name - ) - if "LoggingEnabled" in bucket_logging: - bucket.logging = True + regional_client = self.regional_clients[bucket.region] + bucket_logging = regional_client.get_bucket_logging(Bucket=bucket.name) + if "LoggingEnabled" in bucket_logging: + bucket.logging = True except Exception as error: logger.error( f"{regional_client.region} -- {error.__class__.__name__}: {error}" @@ -90,11 +77,13 @@ class Bucket: name: str versioning: bool logging: bool + region: str - def __init__(self, name): + def __init__(self, name, region): self.name = name self.versioning = False self.logging = False + self.region = region s3_client = S3(current_audit_info)