feat(gcp): add 12 new checks for CIS Framework (#2426)

Co-authored-by: Sergio Garcia <sergargar1@gmail.com>
This commit is contained in:
Jit
2023-06-08 10:25:51 +01:00
committed by GitHub
parent 414a45bfb0
commit b73da9c54c
67 changed files with 3171 additions and 124 deletions

View File

@@ -0,0 +1,135 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_apikeys_api_restrictions_configured:
def test_apikeys_no_keys(self):
apikeys_client = mock.MagicMock
apikeys_client.keys = []
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured.apikeys_client",
new=apikeys_client,
):
from prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured import (
apikeys_api_restrictions_configured,
)
check = apikeys_api_restrictions_configured()
result = check.execute()
assert len(result) == 0
def test_one_compliant_key(self):
from prowler.providers.gcp.services.apikeys.apikeys_service import Key
key = Key(
name="test",
id="123",
creation_time="2023-06-01T11:21:41.627509Z",
restrictions={
"apiTargets": [
{"service": "dns.googleapis.com"},
{"service": "oslogin.googleapis.com"},
]
},
project_id=GCP_PROJECT_ID,
)
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured.apikeys_client",
new=apikeys_client,
):
from prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured import (
apikeys_api_restrictions_configured,
)
check = apikeys_api_restrictions_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"API key {key.name} have restrictions configured.",
result[0].status_extended,
)
assert result[0].resource_id == key.id
def test_one_key_without_restrictions(self):
from prowler.providers.gcp.services.apikeys.apikeys_service import Key
key = Key(
name="test",
id="123",
creation_time="2022-06-05T11:21:41.627509Z",
restrictions={},
project_id=GCP_PROJECT_ID,
)
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured.apikeys_client",
new=apikeys_client,
):
from prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured import (
apikeys_api_restrictions_configured,
)
check = apikeys_api_restrictions_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"API key {key.name} doens't have restrictions configured.",
result[0].status_extended,
)
assert result[0].resource_id == key.id
def test_one_key_with_cloudapis_restriction(self):
from prowler.providers.gcp.services.apikeys.apikeys_service import Key
key = Key(
name="test",
id="123",
creation_time="2022-06-05T11:21:41.627509Z",
restrictions={
"apiTargets": [
{"service": "dns.googleapis.com"},
{"service": "oslogin.googleapis.com"},
{"service": "cloudapis.googleapis.com"},
]
},
project_id=GCP_PROJECT_ID,
)
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured.apikeys_client",
new=apikeys_client,
):
from prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured import (
apikeys_api_restrictions_configured,
)
check = apikeys_api_restrictions_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"API key {key.name} doens't have restrictions configured.",
result[0].status_extended,
)
assert result[0].resource_id == key.id

View File

@@ -0,0 +1,90 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_apikeys_key_rotated_in_90_days:
def test_apikeys_no_keys(self):
apikeys_client = mock.MagicMock
apikeys_client.keys = []
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_key_rotated_in_90_days.apikeys_key_rotated_in_90_days.apikeys_client",
new=apikeys_client,
):
from prowler.providers.gcp.services.apikeys.apikeys_key_rotated_in_90_days.apikeys_key_rotated_in_90_days import (
apikeys_key_rotated_in_90_days,
)
check = apikeys_key_rotated_in_90_days()
result = check.execute()
assert len(result) == 0
def test_one_compliant_key(self):
from prowler.providers.gcp.services.apikeys.apikeys_service import Key
key = Key(
name="test",
id="123",
creation_time="2023-06-01T11:21:41.627509Z",
restrictions={},
project_id=GCP_PROJECT_ID,
)
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_key_rotated_in_90_days.apikeys_key_rotated_in_90_days.apikeys_client",
new=apikeys_client,
):
from prowler.providers.gcp.services.apikeys.apikeys_key_rotated_in_90_days.apikeys_key_rotated_in_90_days import (
apikeys_key_rotated_in_90_days,
)
check = apikeys_key_rotated_in_90_days()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"API key {key.name} created in less than 90 days.",
result[0].status_extended,
)
assert result[0].resource_id == key.id
def test_one_key_with_more_than_90_days(self):
from prowler.providers.gcp.services.apikeys.apikeys_service import Key
key = Key(
name="test",
id="123",
creation_time="2022-06-05T11:21:41.627509Z",
restrictions={},
project_id=GCP_PROJECT_ID,
)
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_key_rotated_in_90_days.apikeys_key_rotated_in_90_days.apikeys_client",
new=apikeys_client,
):
from prowler.providers.gcp.services.apikeys.apikeys_key_rotated_in_90_days.apikeys_key_rotated_in_90_days import (
apikeys_key_rotated_in_90_days,
)
check = apikeys_key_rotated_in_90_days()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"API key {key.name} creation date have more than 90 days.",
result[0].status_extended,
)
assert result[0].resource_id == key.id

View File

@@ -0,0 +1,143 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_block_project_wide_ssh_keys_disabled:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled import (
compute_block_project_wide_ssh_keys_disabled,
)
check = compute_block_project_wide_ssh_keys_disabled()
result = check.execute()
assert len(result) == 0
def test_one_compliant_instance_with_block_project_ssh_keys_true(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={"items": [{"key": "block-project-ssh-keys", "value": "true"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled import (
compute_block_project_wide_ssh_keys_disabled,
)
check = compute_block_project_wide_ssh_keys_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"The VM Instance {instance.name} is not making use of common/shared project-wide SSH key",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
def test_one_instance_without_metadata(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled import (
compute_block_project_wide_ssh_keys_disabled,
)
check = compute_block_project_wide_ssh_keys_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"The VM Instance {instance.name} is making use of common/shared project-wide SSH key",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
def test_one_instance_with_block_project_ssh_keys_false(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={"items": [{"key": "block-project-ssh-keys", "value": "false"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled import (
compute_block_project_wide_ssh_keys_disabled,
)
check = compute_block_project_wide_ssh_keys_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"The VM Instance {instance.name} is making use of common/shared project-wide SSH key",
result[0].status_extended,
)
assert result[0].resource_id == instance.id

View File

@@ -32,6 +32,8 @@ class Test_compute_default_service_account_in_use:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
service_accounts=[{"email": "custom@developer.gserviceaccount.com"}],
project_id=GCP_PROJECT_ID,
)
@@ -73,6 +75,8 @@ class Test_compute_default_service_account_in_use:
service_accounts=[
{"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com"}
],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
@@ -113,6 +117,8 @@ class Test_compute_default_service_account_in_use:
service_accounts=[
{"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com"}
],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)

View File

@@ -35,6 +35,8 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
service_accounts=[
{"email": "123-compute@developer.gserviceaccount.com", "scopes": []}
],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
@@ -78,6 +80,8 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
"scopes": ["https://www.googleapis.com/auth/cloud-platform"],
}
],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
@@ -121,6 +125,8 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
"scopes": ["https://www.googleapis.com/auth/cloud-platform"],
}
],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)

View File

@@ -0,0 +1,143 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_encryption_with_csek_is_disabled:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled import (
compute_encryption_with_csek_is_disabled,
)
check = compute_encryption_with_csek_is_disabled()
result = check.execute()
assert len(result) == 0
def test_one_compliant_instance_with_all_encrypted_disks(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={"items": [{"key": "block-project-ssh-keys", "value": "true"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", True), ("disk2", True)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled import (
compute_encryption_with_csek_is_disabled,
)
check = compute_encryption_with_csek_is_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"The VM Instance {instance.name} have every disk encrypted.",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
def test_one_instance_with_one_unecrypted_disk(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", True)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled import (
compute_encryption_with_csek_is_disabled,
)
check = compute_encryption_with_csek_is_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"The VM Instance {instance.name} have the following unencrypted disks: '{', '.join([i[0] for i in instance.disks_encryption if not i[1]])}'",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
def test_one_instance_with_all_unencrypted_disks(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={"items": [{"key": "block-project-ssh-keys", "value": "false"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled import (
compute_encryption_with_csek_is_disabled,
)
check = compute_encryption_with_csek_is_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"The VM Instance {instance.name} have the following unencrypted disks: '{', '.join([i[0] for i in instance.disks_encryption if not i[1]])}'",
result[0].status_extended,
)
assert result[0].resource_id == instance.id

View File

@@ -0,0 +1,146 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_ip_forwarding_is_enabled:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled import (
compute_ip_forwarding_is_enabled,
)
check = compute_ip_forwarding_is_enabled()
result = check.execute()
assert len(result) == 0
def test_one_compliant_instance(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[{"email": "123-compute@developer.gserviceaccount.com"}],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled import (
compute_ip_forwarding_is_enabled,
)
check = compute_ip_forwarding_is_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"The IP Forwarding of VM Instance {instance.name} is not enabled",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
def test_one_compliant_instance_gke(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="gke-test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[
{"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com"}
],
ip_forward=True,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled import (
compute_ip_forwarding_is_enabled,
)
check = compute_ip_forwarding_is_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"The IP Forwarding of VM Instance {instance.name} is not enabled",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
def test_instance_with_ip_forwarding_enabled(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[
{"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com"}
],
ip_forward=True,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled import (
compute_ip_forwarding_is_enabled,
)
check = compute_ip_forwarding_is_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"The IP Forwarding of VM Instance {instance.name} is enabled",
result[0].status_extended,
)
assert result[0].resource_id == instance.id

View File

@@ -0,0 +1,414 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_rdp_access_from_the_internet_allowed:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.firewalls = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 0
def test_one_compliant_rule_with_valid_port(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["443"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_compliant_rule_with_valid_port_range(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["3300-3380"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_compliant_rule_with_valid_source_range(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["127.0.0.1/32"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["3389"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_compliant_rule_with_valid_protocol(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "udp", "ports": ["3389"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_compliant_rule_with_valid_direction(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="EGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["3389"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_non_compliant_rule_with_single_port(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["3389"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Firewall {firewall.name} does exposes port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_non_compliant_rule_with_port_range(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["3380-3390"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Firewall {firewall.name} does exposes port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_non_compliant_with_all_ports_allowed(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp"}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Firewall {firewall.name} does exposes port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_non_compliant_with_all_protocols_allowed(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "all"}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Firewall {firewall.name} does exposes port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_non_compliant_with_2_rules(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[
{"IPProtocol": "udp", "ports": ["3389"]},
{"IPProtocol": "all"},
],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Firewall {firewall.name} does exposes port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_compliant_with_3_rules(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[
{"IPProtocol": "udp", "ports": ["3389"]},
{"IPProtocol": "tcp", "ports": ["23"]},
{"IPProtocol": "udp"},
],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 3389",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id

View File

@@ -7,7 +7,7 @@ GCP_PROJECT_ID = "123456789012"
class Test_compute_serial_ports_in_use:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = []
with mock.patch(
@@ -34,11 +34,13 @@ class Test_compute_serial_ports_in_use:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
@@ -72,11 +74,13 @@ class Test_compute_serial_ports_in_use:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
@@ -110,11 +114,13 @@ class Test_compute_serial_ports_in_use:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
@@ -148,11 +154,13 @@ class Test_compute_serial_ports_in_use:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
@@ -186,11 +194,13 @@ class Test_compute_serial_ports_in_use:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(

View File

@@ -34,6 +34,8 @@ class Test_compute_shielded_vm_enabled:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
@@ -72,6 +74,8 @@ class Test_compute_shielded_vm_enabled:
shielded_enabled_vtpm=False,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
@@ -110,6 +114,8 @@ class Test_compute_shielded_vm_enabled:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=False,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)

View File

@@ -0,0 +1,414 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_ssh_access_from_the_internet_allowed:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.firewalls = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 0
def test_one_compliant_rule_with_valid_port(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["443"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_compliant_rule_with_valid_port_range(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["1-20"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_compliant_rule_with_valid_source_range(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["127.0.0.1/32"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["22"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_compliant_rule_with_valid_protocol(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "udp", "ports": ["22"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_compliant_rule_with_valid_direction(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="EGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["22"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_non_compliant_rule_with_single_port(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["22"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Firewall {firewall.name} does exposes port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_non_compliant_rule_with_port_range(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp", "ports": ["20-443"]}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Firewall {firewall.name} does exposes port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_non_compliant_with_all_ports_allowed(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "tcp"}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Firewall {firewall.name} does exposes port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_non_compliant_with_all_protocols_allowed(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[{"IPProtocol": "all"}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Firewall {firewall.name} does exposes port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_non_compliant_with_2_rules(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[
{"IPProtocol": "udp", "ports": ["22"]},
{"IPProtocol": "all"},
],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Firewall {firewall.name} does exposes port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id
def test_one_compliant_with_3_rules(self):
from prowler.providers.gcp.services.compute.compute_service import Firewall
firewall = Firewall(
name="test",
id="1234567890",
source_ranges=["0.0.0.0/0"],
direction="INGRESS",
allowed_rules=[
{"IPProtocol": "udp", "ports": ["22"]},
{"IPProtocol": "tcp", "ports": ["23"]},
{"IPProtocol": "udp"},
],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Firewall {firewall.name} does not expose port 22",
result[0].status_extended,
)
assert result[0].resource_id == firewall.id

View File

@@ -0,0 +1,88 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_dataproc_encrypted_with_cmks_disabled:
def test_dataproc_no_clsuters(self):
dataproc_client = mock.MagicMock
dataproc_client.clusters = []
with mock.patch(
"prowler.providers.gcp.services.dataproc.dataproc_encrypted_with_cmks_disabled.dataproc_encrypted_with_cmks_disabled.dataproc_client",
new=dataproc_client,
):
from prowler.providers.gcp.services.dataproc.dataproc_encrypted_with_cmks_disabled.dataproc_encrypted_with_cmks_disabled import (
dataproc_encrypted_with_cmks_disabled,
)
check = dataproc_encrypted_with_cmks_disabled()
result = check.execute()
assert len(result) == 0
def test_one_compliant_cluster(self):
from prowler.providers.gcp.services.dataproc.dataproc_service import Cluster
cluster = Cluster(
name="test",
id="1234567890",
encryption_config={"gcePdKmsKeyName": "test"},
project_id=GCP_PROJECT_ID,
)
dataproc_client = mock.MagicMock
dataproc_client.project_ids = [GCP_PROJECT_ID]
dataproc_client.clusters = [cluster]
with mock.patch(
"prowler.providers.gcp.services.dataproc.dataproc_encrypted_with_cmks_disabled.dataproc_encrypted_with_cmks_disabled.dataproc_client",
new=dataproc_client,
):
from prowler.providers.gcp.services.dataproc.dataproc_encrypted_with_cmks_disabled.dataproc_encrypted_with_cmks_disabled import (
dataproc_encrypted_with_cmks_disabled,
)
check = dataproc_encrypted_with_cmks_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Dataproc cluster {cluster.name} is encrypted with customer managed encryption keys.",
result[0].status_extended,
)
assert result[0].resource_id == cluster.id
def test_cluster_without_encryption(self):
from prowler.providers.gcp.services.dataproc.dataproc_service import Cluster
cluster = Cluster(
name="test",
id="1234567890",
encryption_config={},
project_id=GCP_PROJECT_ID,
)
dataproc_client = mock.MagicMock
dataproc_client.project_ids = [GCP_PROJECT_ID]
dataproc_client.clusters = [cluster]
with mock.patch(
"prowler.providers.gcp.services.dataproc.dataproc_encrypted_with_cmks_disabled.dataproc_encrypted_with_cmks_disabled.dataproc_client",
new=dataproc_client,
):
from prowler.providers.gcp.services.dataproc.dataproc_encrypted_with_cmks_disabled.dataproc_encrypted_with_cmks_disabled import (
dataproc_encrypted_with_cmks_disabled,
)
check = dataproc_encrypted_with_cmks_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Dataproc cluster {cluster.name} is not encrypted with customer managed encryption keys.",
result[0].status_extended,
)
assert result[0].resource_id == cluster.id

View File

@@ -0,0 +1,116 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_dns_dnssec_disabled:
def test_dns_no_managed_zones(self):
dns_client = mock.MagicMock
dns_client.managed_zones = []
with mock.patch(
"prowler.providers.gcp.services.dns.dns_dnssec_disabled.dns_dnssec_disabled.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.dns.dns_dnssec_disabled.dns_dnssec_disabled import (
dns_dnssec_disabled,
)
check = dns_dnssec_disabled()
result = check.execute()
assert len(result) == 0
def test_one_compliant_managed_zone(self):
from prowler.providers.gcp.services.dns.dns_service import ManagedZone
managed_zone = ManagedZone(
name="test",
id="1234567890",
dnssec=True,
key_specs=[
{
"keyType": "keySigning",
"algorithm": "rsasha1",
"keyLength": 2048,
"kind": "dns#dnsKeySpec",
},
{
"keyType": "zoneSigning",
"algorithm": "rsasha1",
"keyLength": 1024,
"kind": "dns#dnsKeySpec",
},
],
project_id=GCP_PROJECT_ID,
)
dns_client = mock.MagicMock
dns_client.project_ids = [GCP_PROJECT_ID]
dns_client.managed_zones = [managed_zone]
with mock.patch(
"prowler.providers.gcp.services.dns.dns_dnssec_disabled.dns_dnssec_disabled.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.dns.dns_dnssec_disabled.dns_dnssec_disabled import (
dns_dnssec_disabled,
)
check = dns_dnssec_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Cloud DNS {managed_zone.name} have DNSSEC enabled.",
result[0].status_extended,
)
assert result[0].resource_id == managed_zone.id
def test_managed_zone_with_dnssec_disabled(self):
from prowler.providers.gcp.services.dns.dns_service import ManagedZone
managed_zone = ManagedZone(
name="test",
id="1234567890",
dnssec=False,
key_specs=[
{
"keyType": "keySigning",
"algorithm": "rsasha1",
"keyLength": 2048,
"kind": "dns#dnsKeySpec",
},
{
"keyType": "zoneSigning",
"algorithm": "rsasha1",
"keyLength": 1024,
"kind": "dns#dnsKeySpec",
},
],
project_id=GCP_PROJECT_ID,
)
dns_client = mock.MagicMock
dns_client.project_ids = [GCP_PROJECT_ID]
dns_client.managed_zones = [managed_zone]
with mock.patch(
"prowler.providers.gcp.services.dns.dns_dnssec_disabled.dns_dnssec_disabled.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.dns.dns_dnssec_disabled.dns_dnssec_disabled import (
dns_dnssec_disabled,
)
check = dns_dnssec_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Cloud DNS {managed_zone.name} doens't have DNSSEC enabled.",
result[0].status_extended,
)
assert result[0].resource_id == managed_zone.id

View File

@@ -0,0 +1,116 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_dns_rsasha1_in_use_to_key_sign_in_dnssec:
def test_dns_no_managed_zones(self):
dns_client = mock.MagicMock
dns_client.managed_zones = []
with mock.patch(
"prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_key_sign_in_dnssec.dns_rsasha1_in_use_to_key_sign_in_dnssec.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_key_sign_in_dnssec.dns_rsasha1_in_use_to_key_sign_in_dnssec import (
dns_rsasha1_in_use_to_key_sign_in_dnssec,
)
check = dns_rsasha1_in_use_to_key_sign_in_dnssec()
result = check.execute()
assert len(result) == 0
def test_one_compliant_managed_zone(self):
from prowler.providers.gcp.services.dns.dns_service import ManagedZone
managed_zone = ManagedZone(
name="test",
id="1234567890",
dnssec=True,
key_specs=[
{
"keyType": "keySigning",
"algorithm": "rsasha256",
"keyLength": 2048,
"kind": "dns#dnsKeySpec",
},
{
"keyType": "zoneSigning",
"algorithm": "rsasha1",
"keyLength": 1024,
"kind": "dns#dnsKeySpec",
},
],
project_id=GCP_PROJECT_ID,
)
dns_client = mock.MagicMock
dns_client.project_ids = [GCP_PROJECT_ID]
dns_client.managed_zones = [managed_zone]
with mock.patch(
"prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_key_sign_in_dnssec.dns_rsasha1_in_use_to_key_sign_in_dnssec.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_key_sign_in_dnssec.dns_rsasha1_in_use_to_key_sign_in_dnssec import (
dns_rsasha1_in_use_to_key_sign_in_dnssec,
)
check = dns_rsasha1_in_use_to_key_sign_in_dnssec()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Cloud DNS {managed_zone.name} is not using RSASHA1 algorithm as key signing.",
result[0].status_extended,
)
assert result[0].resource_id == managed_zone.id
def test_managed_zone_with_rsasha1_key_sign(self):
from prowler.providers.gcp.services.dns.dns_service import ManagedZone
managed_zone = ManagedZone(
name="test",
id="1234567890",
dnssec=False,
key_specs=[
{
"keyType": "keySigning",
"algorithm": "rsasha1",
"keyLength": 2048,
"kind": "dns#dnsKeySpec",
},
{
"keyType": "zoneSigning",
"algorithm": "rsasha256",
"keyLength": 1024,
"kind": "dns#dnsKeySpec",
},
],
project_id=GCP_PROJECT_ID,
)
dns_client = mock.MagicMock
dns_client.project_ids = [GCP_PROJECT_ID]
dns_client.managed_zones = [managed_zone]
with mock.patch(
"prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_key_sign_in_dnssec.dns_rsasha1_in_use_to_key_sign_in_dnssec.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_key_sign_in_dnssec.dns_rsasha1_in_use_to_key_sign_in_dnssec import (
dns_rsasha1_in_use_to_key_sign_in_dnssec,
)
check = dns_rsasha1_in_use_to_key_sign_in_dnssec()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Cloud DNS {managed_zone.name} is using RSASHA1 algorithm as key signing.",
result[0].status_extended,
)
assert result[0].resource_id == managed_zone.id

View File

@@ -0,0 +1,116 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_dns_rsasha1_in_use_to_zone_sign_in_dnssec:
def test_dns_no_managed_zones(self):
dns_client = mock.MagicMock
dns_client.managed_zones = []
with mock.patch(
"prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_zone_sign_in_dnssec.dns_rsasha1_in_use_to_zone_sign_in_dnssec.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_zone_sign_in_dnssec.dns_rsasha1_in_use_to_zone_sign_in_dnssec import (
dns_rsasha1_in_use_to_zone_sign_in_dnssec,
)
check = dns_rsasha1_in_use_to_zone_sign_in_dnssec()
result = check.execute()
assert len(result) == 0
def test_one_compliant_managed_zone(self):
from prowler.providers.gcp.services.dns.dns_service import ManagedZone
managed_zone = ManagedZone(
name="test",
id="1234567890",
dnssec=True,
key_specs=[
{
"keyType": "keySigning",
"algorithm": "rsasha1",
"keyLength": 2048,
"kind": "dns#dnsKeySpec",
},
{
"keyType": "zoneSigning",
"algorithm": "rsasha256",
"keyLength": 1024,
"kind": "dns#dnsKeySpec",
},
],
project_id=GCP_PROJECT_ID,
)
dns_client = mock.MagicMock
dns_client.project_ids = [GCP_PROJECT_ID]
dns_client.managed_zones = [managed_zone]
with mock.patch(
"prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_zone_sign_in_dnssec.dns_rsasha1_in_use_to_zone_sign_in_dnssec.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_zone_sign_in_dnssec.dns_rsasha1_in_use_to_zone_sign_in_dnssec import (
dns_rsasha1_in_use_to_zone_sign_in_dnssec,
)
check = dns_rsasha1_in_use_to_zone_sign_in_dnssec()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Cloud DNS {managed_zone.name} is not using RSASHA1 algorithm as zone signing.",
result[0].status_extended,
)
assert result[0].resource_id == managed_zone.id
def test_managed_zone_with_dnssec_disabled(self):
from prowler.providers.gcp.services.dns.dns_service import ManagedZone
managed_zone = ManagedZone(
name="test",
id="1234567890",
dnssec=False,
key_specs=[
{
"keyType": "keySigning",
"algorithm": "rsasha256",
"keyLength": 2048,
"kind": "dns#dnsKeySpec",
},
{
"keyType": "zoneSigning",
"algorithm": "rsasha1",
"keyLength": 1024,
"kind": "dns#dnsKeySpec",
},
],
project_id=GCP_PROJECT_ID,
)
dns_client = mock.MagicMock
dns_client.project_ids = [GCP_PROJECT_ID]
dns_client.managed_zones = [managed_zone]
with mock.patch(
"prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_zone_sign_in_dnssec.dns_rsasha1_in_use_to_zone_sign_in_dnssec.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.dns.dns_rsasha1_in_use_to_zone_sign_in_dnssec.dns_rsasha1_in_use_to_zone_sign_in_dnssec import (
dns_rsasha1_in_use_to_zone_sign_in_dnssec,
)
check = dns_rsasha1_in_use_to_zone_sign_in_dnssec()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Cloud DNS {managed_zone.name} is using RSASHA1 algorithm as zone signing.",
result[0].status_extended,
)
assert result[0].resource_id == managed_zone.id

View File

@@ -0,0 +1,141 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_iam_no_service_roles_at_project_level:
def test_iam_no_bindings(self):
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.bindings = []
with mock.patch(
"prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level import (
iam_no_service_roles_at_project_level,
)
check = iam_no_service_roles_at_project_level()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"No IAM Users assigned to service roles at project level",
result[0].status_extended,
)
assert result[0].resource_id == GCP_PROJECT_ID
def test_three_compliant_binding(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding1 = Binding(
role="roles/cloudfunctions.serviceAgent",
members=[["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"]],
project_id=GCP_PROJECT_ID,
)
binding2 = Binding(
role="roles/compute.serviceAgent",
members=[["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"]],
project_id=GCP_PROJECT_ID,
)
binding3 = Binding(
role="roles/connectors.managedZoneViewer",
members=[["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"]],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
with mock.patch(
"prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level import (
iam_no_service_roles_at_project_level,
)
check = iam_no_service_roles_at_project_level()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"No IAM Users assigned to service roles at project level",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
def test_binding_with_service_account_user(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding = Binding(
role="roles/iam.serviceAccountUser",
members=[["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"]],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding]
with mock.patch(
"prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level import (
iam_no_service_roles_at_project_level,
)
check = iam_no_service_roles_at_project_level()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"IAM Users assigned to service role '{binding.role}' at project level",
result[0].status_extended,
)
assert result[0].resource_id == binding.role
def test_binding_with_service_account_token_creator(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding = Binding(
role="roles/iam.serviceAccountTokenCreator",
members=[["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"]],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding]
with mock.patch(
"prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level import (
iam_no_service_roles_at_project_level,
)
check = iam_no_service_roles_at_project_level()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"IAM Users assigned to service role '{binding.role}' at project level {GCP_PROJECT_ID}",
result[0].status_extended,
)
assert result[0].resource_id == binding.role