fix(get_checks_from_input_arn): fix logic and add tests (#2764)

This commit is contained in:
Nacho Rivera
2023-08-23 09:35:42 +02:00
committed by GitHub
parent e5d2c0c700
commit 83bfd8a2d4
2 changed files with 191 additions and 2 deletions

View File

@@ -195,6 +195,7 @@ def get_aws_available_regions():
def get_checks_from_input_arn(audit_resources: list, provider: str) -> set:
"""get_checks_from_input_arn gets the list of checks from the input arns"""
checks_from_arn = set()
is_subservice_in_checks = False
# Handle if there are audit resources so only their services are executed
if audit_resources:
services_without_subservices = ["guardduty", "kms", "s3", "elb", "efs"]
@@ -246,8 +247,10 @@ def get_checks_from_input_arn(audit_resources: list, provider: str) -> set:
if any(sub_service in check for sub_service in sub_service_list):
if not (sub_service == "policy" and "password_policy" in check):
checks_from_arn.add(check)
else:
checks_from_arn.add(check)
is_subservice_in_checks = True
if not is_subservice_in_checks:
checks_from_arn = checks
# Return final checks list
return sorted(checks_from_arn)

View File

@@ -138,6 +138,40 @@ def mock_recover_checks_from_aws_provider_lambda_service(*_):
]
def mock_recover_checks_from_aws_provider_elb_service(*_):
return [
(
"elb_insecure_ssl_ciphers",
"/root_dir/fake_path/elb/elb_insecure_ssl_ciphers",
),
(
"elb_internet_facing",
"/root_dir/fake_path/elb/elb_internet_facing",
),
(
"elb_logging_enabled",
"/root_dir/fake_path/elb/elb_logging_enabled",
),
]
def mock_recover_checks_from_aws_provider_efs_service(*_):
return [
(
"efs_encryption_at_rest_enabled",
"/root_dir/fake_path/efs/efs_encryption_at_rest_enabled",
),
(
"efs_have_backup_enabled",
"/root_dir/fake_path/efs/efs_have_backup_enabled",
),
(
"efs_not_publicly_accessible",
"/root_dir/fake_path/efs/efs_not_publicly_accessible",
),
]
def mock_recover_checks_from_aws_provider_iam_service(*_):
return [
(
@@ -172,6 +206,57 @@ def mock_recover_checks_from_aws_provider_s3_service(*_):
]
def mock_recover_checks_from_aws_provider_cloudwatch_service(*_):
return [
(
"cloudwatch_changes_to_network_acls_alarm_configured",
"/root_dir/fake_path/cloudwatch/cloudwatch_changes_to_network_acls_alarm_configured",
),
(
"cloudwatch_changes_to_network_gateways_alarm_configured",
"/root_dir/cloudwatch/cloudwatch_changes_to_network_gateways_alarm_configured",
),
(
"cloudwatch_changes_to_network_route_tables_alarm_configured",
"/root_dir/fake_path/cloudwatch/cloudwatch_changes_to_network_route_tables_alarm_configured",
),
]
def mock_recover_checks_from_aws_provider_ec2_service(*_):
return [
(
"ec2_securitygroup_allow_ingress_from_internet_to_any_port",
"/root_dir/fake_path/ec2/ec2_securitygroup_allow_ingress_from_internet_to_any_port",
),
(
"ec2_networkacl_allow_ingress_any_port",
"/root_dir/fake_path/ec2/ec2_networkacl_allow_ingress_any_port",
),
(
"ec2_ami_public",
"/root_dir/fake_path/ec2/ec2_ami_public",
),
]
def mock_recover_checks_from_aws_provider_rds_service(*_):
return [
(
"rds_instance_backup_enabled",
"/root_dir/fake_path/rds/rds_instance_backup_enabled",
),
(
"rds_instance_deletion_protection",
"/root_dir/fake_path/rds/rds_instance_deletion_protection",
),
(
"rds_snapshots_public_access",
"/root_dir/fake_path/rds/rds_snapshots_public_access",
),
]
class Test_Check:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
@@ -415,6 +500,40 @@ class Test_Check:
recovered_checks = recover_checks_from_service(service_list, provider)
assert recovered_checks == expected_checks
@patch(
"prowler.lib.check.check.recover_checks_from_provider",
new=mock_recover_checks_from_aws_provider_elb_service,
)
def test_get_checks_from_input_arn_elb(self):
audit_resources = [
f"arn:aws:elasticloadbalancing:us-east-1:{AWS_ACCOUNT_NUMBER}:loadbalancer/test"
]
provider = "aws"
expected_checks = [
"elb_insecure_ssl_ciphers",
"elb_internet_facing",
"elb_logging_enabled",
]
recovered_checks = get_checks_from_input_arn(audit_resources, provider)
assert recovered_checks == expected_checks
@patch(
"prowler.lib.check.check.recover_checks_from_provider",
new=mock_recover_checks_from_aws_provider_efs_service,
)
def test_get_checks_from_input_arn_efs(self):
audit_resources = [
f"arn:aws:elasticfilesystem:us-east-1:{AWS_ACCOUNT_NUMBER}:file-system/fs-01234567"
]
provider = "aws"
expected_checks = [
"efs_encryption_at_rest_enabled",
"efs_have_backup_enabled",
"efs_not_publicly_accessible",
]
recovered_checks = get_checks_from_input_arn(audit_resources, provider)
assert recovered_checks == expected_checks
@patch(
"prowler.lib.check.check.recover_checks_from_provider",
new=mock_recover_checks_from_aws_provider_lambda_service,
@@ -460,6 +579,73 @@ class Test_Check:
recovered_checks = get_checks_from_input_arn(audit_resources, provider)
assert recovered_checks == expected_checks
@patch(
"prowler.lib.check.check.recover_checks_from_provider",
new=mock_recover_checks_from_aws_provider_cloudwatch_service,
)
def test_get_checks_from_input_arn_cloudwatch(self):
audit_resources = [
f"arn:aws:logs:us-east-1:{AWS_ACCOUNT_NUMBER}:destination:testDestination"
]
provider = "aws"
expected_checks = [
"cloudwatch_changes_to_network_acls_alarm_configured",
"cloudwatch_changes_to_network_gateways_alarm_configured",
"cloudwatch_changes_to_network_route_tables_alarm_configured",
]
recovered_checks = get_checks_from_input_arn(audit_resources, provider)
assert recovered_checks == expected_checks
@patch(
"prowler.lib.check.check.recover_checks_from_provider",
new=mock_recover_checks_from_aws_provider_ec2_service,
)
def test_get_checks_from_input_arn_ec2_security_group(self):
audit_resources = [
f"arn:aws:ec2:us-east-1:{AWS_ACCOUNT_NUMBER}:security-group/sg-1111111111"
]
provider = "aws"
expected_checks = ["ec2_securitygroup_allow_ingress_from_internet_to_any_port"]
recovered_checks = get_checks_from_input_arn(audit_resources, provider)
assert recovered_checks == expected_checks
@patch(
"prowler.lib.check.check.recover_checks_from_provider",
new=mock_recover_checks_from_aws_provider_ec2_service,
)
def test_get_checks_from_input_arn_ec2_acl(self):
audit_resources = [
f"arn:aws:ec2:us-west-2:{AWS_ACCOUNT_NUMBER}:network-acl/acl-1"
]
provider = "aws"
expected_checks = ["ec2_networkacl_allow_ingress_any_port"]
recovered_checks = get_checks_from_input_arn(audit_resources, provider)
assert recovered_checks == expected_checks
@patch(
"prowler.lib.check.check.recover_checks_from_provider",
new=mock_recover_checks_from_aws_provider_rds_service,
)
def test_get_checks_from_input_arn_rds_snapshots(self):
audit_resources = [
f"arn:aws:rds:us-east-2:{AWS_ACCOUNT_NUMBER}:snapshot:rds:snapshot-1"
]
provider = "aws"
expected_checks = ["rds_snapshots_public_access"]
recovered_checks = get_checks_from_input_arn(audit_resources, provider)
assert recovered_checks == expected_checks
@patch(
"prowler.lib.check.check.recover_checks_from_provider",
new=mock_recover_checks_from_aws_provider_ec2_service,
)
def test_get_checks_from_input_arn_ec2_ami(self):
audit_resources = [f"arn:aws:ec2:us-west-2:{AWS_ACCOUNT_NUMBER}:image/ami-1"]
provider = "aws"
expected_checks = ["ec2_ami_public"]
recovered_checks = get_checks_from_input_arn(audit_resources, provider)
assert recovered_checks == expected_checks
def test_get_regions_from_audit_resources(self):
audit_resources = [
f"arn:aws:lambda:us-east-1:{AWS_ACCOUNT_NUMBER}:function:test-lambda",