feat(gcp): add CIS checks (#2544)

This commit is contained in:
Sergio Garcia
2023-07-06 17:01:56 +02:00
committed by GitHub
parent b1968f3f8b
commit 676e60afb7
118 changed files with 4649 additions and 330 deletions

View File

@@ -40,6 +40,7 @@ class Test_apikeys_api_restrictions_configured:
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
apikeys_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured.apikeys_client",
@@ -55,7 +56,7 @@ class Test_apikeys_api_restrictions_configured:
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"API key {key.name} have restrictions configured.",
f"API key {key.name} has restrictions configured.",
result[0].status_extended,
)
assert result[0].resource_id == key.id
@@ -74,6 +75,7 @@ class Test_apikeys_api_restrictions_configured:
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
apikeys_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured.apikeys_client",
@@ -114,6 +116,7 @@ class Test_apikeys_api_restrictions_configured:
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
apikeys_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_api_restrictions_configured.apikeys_api_restrictions_configured.apikeys_client",

View File

@@ -0,0 +1,65 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_apikeys_key_exists:
def test_apikeys_no_keys(self):
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = []
apikeys_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_key_exists.apikeys_key_exists.apikeys_client",
new=apikeys_client,
):
from prowler.providers.gcp.services.apikeys.apikeys_key_exists.apikeys_key_exists import (
apikeys_key_exists,
)
check = apikeys_key_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Project {GCP_PROJECT_ID} does not have active API Keys.",
result[0].status_extended,
)
assert result[0].resource_id == GCP_PROJECT_ID
def test_one_compliant_key(self):
from prowler.providers.gcp.services.apikeys.apikeys_service import Key
key = Key(
name="test",
id="123",
creation_time="2023-06-01T11:21:41.627509Z",
restrictions={},
project_id=GCP_PROJECT_ID,
)
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
apikeys_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_key_exists.apikeys_key_exists.apikeys_client",
new=apikeys_client,
):
from prowler.providers.gcp.services.apikeys.apikeys_key_exists.apikeys_key_exists import (
apikeys_key_exists,
)
check = apikeys_key_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Project {GCP_PROJECT_ID} has active API Keys.",
result[0].status_extended,
)
assert result[0].resource_id == GCP_PROJECT_ID

View File

@@ -35,6 +35,7 @@ class Test_apikeys_key_rotated_in_90_days:
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
apikeys_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_key_rotated_in_90_days.apikeys_key_rotated_in_90_days.apikeys_client",
@@ -69,6 +70,7 @@ class Test_apikeys_key_rotated_in_90_days:
apikeys_client = mock.MagicMock
apikeys_client.project_ids = [GCP_PROJECT_ID]
apikeys_client.keys = [key]
apikeys_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.apikeys.apikeys_key_rotated_in_90_days.apikeys_key_rotated_in_90_days.apikeys_client",
@@ -84,7 +86,7 @@ class Test_apikeys_key_rotated_in_90_days:
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"API key {key.name} creation date have more than 90 days.",
f"API key {key.name} creation date has more than 90 days.",
result[0].status_extended,
)
assert result[0].resource_id == key.id

View File

@@ -4,21 +4,21 @@ from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_block_project_wide_ssh_keys_disabled:
class Test_compute_instance_block_project_wide_ssh_keys_disabled:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled import (
compute_block_project_wide_ssh_keys_disabled,
from prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled import (
compute_instance_block_project_wide_ssh_keys_disabled,
)
check = compute_block_project_wide_ssh_keys_disabled()
check = compute_instance_block_project_wide_ssh_keys_disabled()
result = check.execute()
assert len(result) == 0
@@ -33,6 +33,7 @@ class Test_compute_block_project_wide_ssh_keys_disabled:
metadata={"items": [{"key": "block-project-ssh-keys", "value": "true"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -44,14 +45,14 @@ class Test_compute_block_project_wide_ssh_keys_disabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled import (
compute_block_project_wide_ssh_keys_disabled,
from prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled import (
compute_instance_block_project_wide_ssh_keys_disabled,
)
check = compute_block_project_wide_ssh_keys_disabled()
check = compute_instance_block_project_wide_ssh_keys_disabled()
result = check.execute()
assert len(result) == 1
@@ -73,6 +74,7 @@ class Test_compute_block_project_wide_ssh_keys_disabled:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -84,14 +86,14 @@ class Test_compute_block_project_wide_ssh_keys_disabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled import (
compute_block_project_wide_ssh_keys_disabled,
from prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled import (
compute_instance_block_project_wide_ssh_keys_disabled,
)
check = compute_block_project_wide_ssh_keys_disabled()
check = compute_instance_block_project_wide_ssh_keys_disabled()
result = check.execute()
assert len(result) == 1
@@ -113,6 +115,7 @@ class Test_compute_block_project_wide_ssh_keys_disabled:
metadata={"items": [{"key": "block-project-ssh-keys", "value": "false"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -124,14 +127,14 @@ class Test_compute_block_project_wide_ssh_keys_disabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_block_project_wide_ssh_keys_disabled.compute_block_project_wide_ssh_keys_disabled import (
compute_block_project_wide_ssh_keys_disabled,
from prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled import (
compute_instance_block_project_wide_ssh_keys_disabled,
)
check = compute_block_project_wide_ssh_keys_disabled()
check = compute_instance_block_project_wide_ssh_keys_disabled()
result = check.execute()
assert len(result) == 1

View File

@@ -4,20 +4,20 @@ from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_default_service_account_in_use:
class Test_compute_instance_default_service_account_in_use:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_default_service_account_in_use.compute_default_service_account_in_use.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use.compute_instance_default_service_account_in_use.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_default_service_account_in_use.compute_default_service_account_in_use import (
compute_default_service_account_in_use,
from prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use.compute_instance_default_service_account_in_use import (
compute_instance_default_service_account_in_use,
)
check = compute_default_service_account_in_use()
check = compute_instance_default_service_account_in_use()
result = check.execute()
assert len(result) == 0
@@ -32,6 +32,7 @@ class Test_compute_default_service_account_in_use:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
service_accounts=[{"email": "custom@developer.gserviceaccount.com"}],
@@ -43,14 +44,14 @@ class Test_compute_default_service_account_in_use:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_default_service_account_in_use.compute_default_service_account_in_use.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use.compute_instance_default_service_account_in_use.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_default_service_account_in_use.compute_default_service_account_in_use import (
compute_default_service_account_in_use,
from prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use.compute_instance_default_service_account_in_use import (
compute_instance_default_service_account_in_use,
)
check = compute_default_service_account_in_use()
check = compute_instance_default_service_account_in_use()
result = check.execute()
assert len(result) == 1
@@ -72,6 +73,7 @@ class Test_compute_default_service_account_in_use:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[
{"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com"}
],
@@ -85,14 +87,14 @@ class Test_compute_default_service_account_in_use:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_default_service_account_in_use.compute_default_service_account_in_use.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use.compute_instance_default_service_account_in_use.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_default_service_account_in_use.compute_default_service_account_in_use import (
compute_default_service_account_in_use,
from prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use.compute_instance_default_service_account_in_use import (
compute_instance_default_service_account_in_use,
)
check = compute_default_service_account_in_use()
check = compute_instance_default_service_account_in_use()
result = check.execute()
assert len(result) == 1
@@ -114,6 +116,7 @@ class Test_compute_default_service_account_in_use:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[
{"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com"}
],
@@ -127,14 +130,14 @@ class Test_compute_default_service_account_in_use:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_default_service_account_in_use.compute_default_service_account_in_use.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use.compute_instance_default_service_account_in_use.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_default_service_account_in_use.compute_default_service_account_in_use import (
compute_default_service_account_in_use,
from prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use.compute_instance_default_service_account_in_use import (
compute_instance_default_service_account_in_use,
)
check = compute_default_service_account_in_use()
check = compute_instance_default_service_account_in_use()
result = check.execute()
assert len(result) == 1

View File

@@ -4,20 +4,22 @@ from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_default_service_account_in_use_with_full_api_access:
class Test_compute_instance_default_service_account_in_use_with_full_api_access:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_default_service_account_in_use_with_full_api_access.compute_default_service_account_in_use_with_full_api_access.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use_with_full_api_access.compute_instance_default_service_account_in_use_with_full_api_access.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_default_service_account_in_use_with_full_api_access.compute_default_service_account_in_use_with_full_api_access import (
compute_default_service_account_in_use_with_full_api_access,
from prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use_with_full_api_access.compute_instance_default_service_account_in_use_with_full_api_access import (
compute_instance_default_service_account_in_use_with_full_api_access,
)
check = compute_default_service_account_in_use_with_full_api_access()
check = (
compute_instance_default_service_account_in_use_with_full_api_access()
)
result = check.execute()
assert len(result) == 0
@@ -32,6 +34,7 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[
{"email": "123-compute@developer.gserviceaccount.com", "scopes": []}
],
@@ -45,14 +48,16 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_default_service_account_in_use_with_full_api_access.compute_default_service_account_in_use_with_full_api_access.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use_with_full_api_access.compute_instance_default_service_account_in_use_with_full_api_access.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_default_service_account_in_use_with_full_api_access.compute_default_service_account_in_use_with_full_api_access import (
compute_default_service_account_in_use_with_full_api_access,
from prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use_with_full_api_access.compute_instance_default_service_account_in_use_with_full_api_access import (
compute_instance_default_service_account_in_use_with_full_api_access,
)
check = compute_default_service_account_in_use_with_full_api_access()
check = (
compute_instance_default_service_account_in_use_with_full_api_access()
)
result = check.execute()
assert len(result) == 1
@@ -74,6 +79,7 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[
{
"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com",
@@ -90,14 +96,16 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_default_service_account_in_use_with_full_api_access.compute_default_service_account_in_use_with_full_api_access.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use_with_full_api_access.compute_instance_default_service_account_in_use_with_full_api_access.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_default_service_account_in_use_with_full_api_access.compute_default_service_account_in_use_with_full_api_access import (
compute_default_service_account_in_use_with_full_api_access,
from prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use_with_full_api_access.compute_instance_default_service_account_in_use_with_full_api_access import (
compute_instance_default_service_account_in_use_with_full_api_access,
)
check = compute_default_service_account_in_use_with_full_api_access()
check = (
compute_instance_default_service_account_in_use_with_full_api_access()
)
result = check.execute()
assert len(result) == 1
@@ -119,6 +127,7 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[
{
"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com",
@@ -135,14 +144,16 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_default_service_account_in_use_with_full_api_access.compute_default_service_account_in_use_with_full_api_access.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use_with_full_api_access.compute_instance_default_service_account_in_use_with_full_api_access.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_default_service_account_in_use_with_full_api_access.compute_default_service_account_in_use_with_full_api_access import (
compute_default_service_account_in_use_with_full_api_access,
from prowler.providers.gcp.services.compute.compute_instance_default_service_account_in_use_with_full_api_access.compute_instance_default_service_account_in_use_with_full_api_access import (
compute_instance_default_service_account_in_use_with_full_api_access,
)
check = compute_default_service_account_in_use_with_full_api_access()
check = (
compute_instance_default_service_account_in_use_with_full_api_access()
)
result = check.execute()
assert len(result) == 1

View File

@@ -4,21 +4,21 @@ from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_encryption_with_csek_is_disabled:
class Test_compute_instance_encryption_with_csek_enabled:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_encryption_with_csek_enabled.compute_instance_encryption_with_csek_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled import (
compute_encryption_with_csek_is_disabled,
from prowler.providers.gcp.services.compute.compute_instance_encryption_with_csek_enabled.compute_instance_encryption_with_csek_enabled import (
compute_instance_encryption_with_csek_enabled,
)
check = compute_encryption_with_csek_is_disabled()
check = compute_instance_encryption_with_csek_enabled()
result = check.execute()
assert len(result) == 0
@@ -33,6 +33,7 @@ class Test_compute_encryption_with_csek_is_disabled:
metadata={"items": [{"key": "block-project-ssh-keys", "value": "true"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", True), ("disk2", True)],
@@ -44,20 +45,20 @@ class Test_compute_encryption_with_csek_is_disabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_encryption_with_csek_enabled.compute_instance_encryption_with_csek_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled import (
compute_encryption_with_csek_is_disabled,
from prowler.providers.gcp.services.compute.compute_instance_encryption_with_csek_enabled.compute_instance_encryption_with_csek_enabled import (
compute_instance_encryption_with_csek_enabled,
)
check = compute_encryption_with_csek_is_disabled()
check = compute_instance_encryption_with_csek_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"The VM Instance {instance.name} have every disk encrypted.",
f"The VM Instance {instance.name} has every disk encrypted.",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
@@ -73,6 +74,7 @@ class Test_compute_encryption_with_csek_is_disabled:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", True)],
@@ -84,20 +86,20 @@ class Test_compute_encryption_with_csek_is_disabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_encryption_with_csek_enabled.compute_instance_encryption_with_csek_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled import (
compute_encryption_with_csek_is_disabled,
from prowler.providers.gcp.services.compute.compute_instance_encryption_with_csek_enabled.compute_instance_encryption_with_csek_enabled import (
compute_instance_encryption_with_csek_enabled,
)
check = compute_encryption_with_csek_is_disabled()
check = compute_instance_encryption_with_csek_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"The VM Instance {instance.name} have the following unencrypted disks: '{', '.join([i[0] for i in instance.disks_encryption if not i[1]])}'",
f"The VM Instance {instance.name} has the following unencrypted disks: '{', '.join([i[0] for i in instance.disks_encryption if not i[1]])}'",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
@@ -113,6 +115,7 @@ class Test_compute_encryption_with_csek_is_disabled:
metadata={"items": [{"key": "block-project-ssh-keys", "value": "false"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -124,20 +127,20 @@ class Test_compute_encryption_with_csek_is_disabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_encryption_with_csek_enabled.compute_instance_encryption_with_csek_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_encryption_with_csek_is_disabled.compute_encryption_with_csek_is_disabled import (
compute_encryption_with_csek_is_disabled,
from prowler.providers.gcp.services.compute.compute_instance_encryption_with_csek_enabled.compute_instance_encryption_with_csek_enabled import (
compute_instance_encryption_with_csek_enabled,
)
check = compute_encryption_with_csek_is_disabled()
check = compute_instance_encryption_with_csek_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"The VM Instance {instance.name} have the following unencrypted disks: '{', '.join([i[0] for i in instance.disks_encryption if not i[1]])}'",
f"The VM Instance {instance.name} has the following unencrypted disks: '{', '.join([i[0] for i in instance.disks_encryption if not i[1]])}'",
result[0].status_extended,
)
assert result[0].resource_id == instance.id

View File

@@ -0,0 +1,111 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_instance_confidential_computing_enabled:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_confidential_computing_enabled.compute_instance_confidential_computing_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_instance_confidential_computing_enabled.compute_instance_confidential_computing_enabled import (
compute_instance_confidential_computing_enabled,
)
check = compute_instance_confidential_computing_enabled()
result = check.execute()
assert len(result) == 0
def test_one_compliant_instance(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_confidential_computing_enabled.compute_instance_confidential_computing_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_instance_confidential_computing_enabled.compute_instance_confidential_computing_enabled import (
compute_instance_confidential_computing_enabled,
)
check = compute_instance_confidential_computing_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"VM Instance {instance.name} has Confidential Computing enabled",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
assert result[0].resource_name == instance.name
assert result[0].location == instance.zone
assert result[0].project_id == GCP_PROJECT_ID
def test_one_instance_with_shielded_vtpm_disabled(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
public_ip=True,
metadata={},
shielded_enabled_vtpm=False,
shielded_enabled_integrity_monitoring=True,
confidential_computing=False,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_confidential_computing_enabled.compute_instance_confidential_computing_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_instance_confidential_computing_enabled.compute_instance_confidential_computing_enabled import (
compute_instance_confidential_computing_enabled,
)
check = compute_instance_confidential_computing_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"VM Instance {instance.name} does not have Confidential Computing enabled",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
assert result[0].resource_name == instance.name
assert result[0].location == instance.zone
assert result[0].project_id == GCP_PROJECT_ID

View File

@@ -4,20 +4,20 @@ from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_ip_forwarding_is_enabled:
class Test_compute_instance_ip_forwarding_is_enabled:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_ip_forwarding_is_enabled.compute_instance_ip_forwarding_is_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled import (
compute_ip_forwarding_is_enabled,
from prowler.providers.gcp.services.compute.compute_instance_ip_forwarding_is_enabled.compute_instance_ip_forwarding_is_enabled import (
compute_instance_ip_forwarding_is_enabled,
)
check = compute_ip_forwarding_is_enabled()
check = compute_instance_ip_forwarding_is_enabled()
result = check.execute()
assert len(result) == 0
@@ -32,6 +32,7 @@ class Test_compute_ip_forwarding_is_enabled:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[{"email": "123-compute@developer.gserviceaccount.com"}],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -43,14 +44,14 @@ class Test_compute_ip_forwarding_is_enabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_ip_forwarding_is_enabled.compute_instance_ip_forwarding_is_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled import (
compute_ip_forwarding_is_enabled,
from prowler.providers.gcp.services.compute.compute_instance_ip_forwarding_is_enabled.compute_instance_ip_forwarding_is_enabled import (
compute_instance_ip_forwarding_is_enabled,
)
check = compute_ip_forwarding_is_enabled()
check = compute_instance_ip_forwarding_is_enabled()
result = check.execute()
assert len(result) == 1
@@ -72,6 +73,7 @@ class Test_compute_ip_forwarding_is_enabled:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[
{"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com"}
],
@@ -85,14 +87,14 @@ class Test_compute_ip_forwarding_is_enabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_ip_forwarding_is_enabled.compute_instance_ip_forwarding_is_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled import (
compute_ip_forwarding_is_enabled,
from prowler.providers.gcp.services.compute.compute_instance_ip_forwarding_is_enabled.compute_instance_ip_forwarding_is_enabled import (
compute_instance_ip_forwarding_is_enabled,
)
check = compute_ip_forwarding_is_enabled()
check = compute_instance_ip_forwarding_is_enabled()
result = check.execute()
assert len(result) == 1
@@ -114,6 +116,7 @@ class Test_compute_ip_forwarding_is_enabled:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[
{"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com"}
],
@@ -127,14 +130,14 @@ class Test_compute_ip_forwarding_is_enabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_ip_forwarding_is_enabled.compute_instance_ip_forwarding_is_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ip_forwarding_is_enabled.compute_ip_forwarding_is_enabled import (
compute_ip_forwarding_is_enabled,
from prowler.providers.gcp.services.compute.compute_instance_ip_forwarding_is_enabled.compute_instance_ip_forwarding_is_enabled import (
compute_instance_ip_forwarding_is_enabled,
)
check = compute_ip_forwarding_is_enabled()
check = compute_instance_ip_forwarding_is_enabled()
result = check.execute()
assert len(result) == 1

View File

@@ -0,0 +1,99 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_loadbalancer_logging_enabled:
def test_compute_no_load_balancers(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.load_balancers = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_loadbalancer_logging_enabled.compute_loadbalancer_logging_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_loadbalancer_logging_enabled.compute_loadbalancer_logging_enabled import (
compute_loadbalancer_logging_enabled,
)
check = compute_loadbalancer_logging_enabled()
result = check.execute()
assert len(result) == 0
def test_one_compliant_load_balancer(self):
from prowler.providers.gcp.services.compute.compute_service import LoadBalancer
load_balancer = LoadBalancer(
name="test",
id="test_id",
project_id=GCP_PROJECT_ID,
logging=True,
service="test",
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.load_balancers = [load_balancer]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_loadbalancer_logging_enabled.compute_loadbalancer_logging_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_loadbalancer_logging_enabled.compute_loadbalancer_logging_enabled import (
compute_loadbalancer_logging_enabled,
)
check = compute_loadbalancer_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has logging enabled",
result[0].status_extended,
)
assert result[0].resource_id == load_balancer.id
assert result[0].resource_name == load_balancer.name
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == compute_client.region
def test_one_uncompliant_load_balancer(self):
from prowler.providers.gcp.services.compute.compute_service import LoadBalancer
load_balancer = LoadBalancer(
name="test",
id="test_id",
project_id=GCP_PROJECT_ID,
logging=False,
service="test",
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.load_balancers = [load_balancer]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_loadbalancer_logging_enabled.compute_loadbalancer_logging_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_loadbalancer_logging_enabled.compute_loadbalancer_logging_enabled import (
compute_loadbalancer_logging_enabled,
)
check = compute_loadbalancer_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have logging enabled",
result[0].status_extended,
)
assert result[0].resource_id == load_balancer.id
assert result[0].resource_name == load_balancer.name
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == compute_client.region

View File

@@ -0,0 +1,126 @@
from re import search
from unittest import mock
from prowler.providers.gcp.services.dns.dns_service import Policy
GCP_PROJECT_ID = "123456789012"
class Test_compute_network_dns_logging_enabled:
def test_compute_no_networks(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.networks = []
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_network_dns_logging_enabled.compute_network_dns_logging_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_network_dns_logging_enabled.compute_network_dns_logging_enabled import (
compute_network_dns_logging_enabled,
)
check = compute_network_dns_logging_enabled()
result = check.execute()
assert len(result) == 0
def test_one_compliant_network(self):
from prowler.providers.gcp.services.compute.compute_service import Network
network = Network(
name="test", id="test_id", project_id=GCP_PROJECT_ID, subnet_mode="auto"
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.networks = [network]
compute_client.region = "global"
policy = Policy(
name="test",
id="test_id",
logging=True,
networks=["test"],
project_id=GCP_PROJECT_ID,
)
dns_client = mock.MagicMock
dns_client.project_ids = [GCP_PROJECT_ID]
dns_client.policies = [policy]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_network_dns_logging_enabled.compute_network_dns_logging_enabled.compute_client",
new=compute_client,
):
with mock.patch(
"prowler.providers.gcp.services.compute.compute_network_dns_logging_enabled.compute_network_dns_logging_enabled.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.compute.compute_network_dns_logging_enabled.compute_network_dns_logging_enabled import (
compute_network_dns_logging_enabled,
)
check = compute_network_dns_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has DNS logging enabled",
result[0].status_extended,
)
assert result[0].resource_id == network.id
assert result[0].resource_name == network.name
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == compute_client.region
def test_one_uncompliant_network(self):
from prowler.providers.gcp.services.compute.compute_service import Network
network = Network(
name="test", id="test_id", project_id=GCP_PROJECT_ID, subnet_mode="auto"
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.networks = [network]
compute_client.region = "global"
policy = Policy(
name="test",
id="test_id",
logging=False,
networks=["test"],
project_id=GCP_PROJECT_ID,
)
dns_client = mock.MagicMock
dns_client.project_ids = [GCP_PROJECT_ID]
dns_client.policies = [policy]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_network_dns_logging_enabled.compute_network_dns_logging_enabled.compute_client",
new=compute_client,
):
with mock.patch(
"prowler.providers.gcp.services.compute.compute_network_dns_logging_enabled.compute_network_dns_logging_enabled.dns_client",
new=dns_client,
):
from prowler.providers.gcp.services.compute.compute_network_dns_logging_enabled.compute_network_dns_logging_enabled import (
compute_network_dns_logging_enabled,
)
check = compute_network_dns_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have DNS logging enabled",
result[0].status_extended,
)
assert result[0].resource_id == network.id
assert result[0].resource_name == network.name
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == compute_client.region

View File

@@ -0,0 +1,98 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_network_not_legacy:
def test_compute_no_networks(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.networks = []
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_network_not_legacy.compute_network_not_legacy.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_network_not_legacy.compute_network_not_legacy import (
compute_network_not_legacy,
)
check = compute_network_not_legacy()
result = check.execute()
assert len(result) == 0
def test_one_compliant_network(self):
from prowler.providers.gcp.services.compute.compute_service import Network
network = Network(
name="test",
id="test_id",
project_id=GCP_PROJECT_ID,
subnet_mode="custom",
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.networks = [network]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_network_not_legacy.compute_network_not_legacy.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_network_not_legacy.compute_network_not_legacy import (
compute_network_not_legacy,
)
check = compute_network_not_legacy()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"Network test is not legacy",
result[0].status_extended,
)
assert result[0].resource_id == network.id
assert result[0].resource_name == network.name
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == compute_client.region
def test_one_legacy_network(self):
from prowler.providers.gcp.services.compute.compute_service import Network
network = Network(
name="test",
id="test_id",
project_id=GCP_PROJECT_ID,
subnet_mode="legacy",
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.networks = [network]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_network_not_legacy.compute_network_not_legacy.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_network_not_legacy.compute_network_not_legacy import (
compute_network_not_legacy,
)
check = compute_network_not_legacy()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"Legacy network test exists",
result[0].status_extended,
)
assert result[0].resource_id == network.id
assert result[0].resource_name == network.name
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == compute_client.region

View File

@@ -0,0 +1,89 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_project_os_login_enabled:
def test_compute_no_project(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.projects = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
compute_project_os_login_enabled,
)
check = compute_project_os_login_enabled()
result = check.execute()
assert len(result) == 0
def test_one_compliant_project(self):
from prowler.providers.gcp.services.compute.compute_service import Project
project = Project(
id=GCP_PROJECT_ID,
enable_oslogin=True,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.projects = [project]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
compute_project_os_login_enabled,
)
check = compute_project_os_login_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Project {project.id} has OS Login enabled",
result[0].status_extended,
)
assert result[0].resource_id == project.id
assert result[0].location == "global"
assert result[0].project_id == GCP_PROJECT_ID
def test_one_non_compliant_project(self):
from prowler.providers.gcp.services.compute.compute_service import Project
project = Project(
id=GCP_PROJECT_ID,
enable_oslogin=False,
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.projects = [project]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
compute_project_os_login_enabled,
)
check = compute_project_os_login_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Project {project.id} does not have OS Login enabled",
result[0].status_extended,
)
assert result[0].resource_id == project.id
assert result[0].location == "global"
assert result[0].project_id == GCP_PROJECT_ID

View File

@@ -4,20 +4,20 @@ from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_rdp_access_from_the_internet_allowed:
class Test_compute_firewall_rdp_access_from_the_internet_allowed:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.firewalls = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 0
@@ -36,16 +36,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -71,16 +72,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -106,16 +108,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -141,16 +144,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -176,16 +180,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -211,16 +216,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -246,16 +252,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -281,16 +288,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -316,16 +324,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -354,16 +363,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -393,16 +403,17 @@ class Test_compute_rdp_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_rdp_access_from_the_internet_allowed.compute_rdp_access_from_the_internet_allowed import (
compute_rdp_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_rdp_access_from_the_internet_allowed.compute_firewall_rdp_access_from_the_internet_allowed import (
compute_firewall_rdp_access_from_the_internet_allowed,
)
check = compute_rdp_access_from_the_internet_allowed()
check = compute_firewall_rdp_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1

View File

@@ -4,21 +4,21 @@ from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_serial_ports_in_use:
class Test_compute_instance_serial_ports_in_use:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use import (
compute_serial_ports_in_use,
from prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use import (
compute_instance_serial_ports_in_use,
)
check = compute_serial_ports_in_use()
check = compute_instance_serial_ports_in_use()
result = check.execute()
assert len(result) == 0
@@ -33,6 +33,7 @@ class Test_compute_serial_ports_in_use:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -44,20 +45,20 @@ class Test_compute_serial_ports_in_use:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use import (
compute_serial_ports_in_use,
from prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use import (
compute_instance_serial_ports_in_use,
)
check = compute_serial_ports_in_use()
check = compute_instance_serial_ports_in_use()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"VM Instance {instance.name} have Enable Connecting to Serial Ports off",
f"VM Instance {instance.name} has Enable Connecting to Serial Ports off",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
@@ -73,6 +74,7 @@ class Test_compute_serial_ports_in_use:
metadata={"items": [{"key": "serial-port-enabled", "value": "0"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -84,20 +86,20 @@ class Test_compute_serial_ports_in_use:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use import (
compute_serial_ports_in_use,
from prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use import (
compute_instance_serial_ports_in_use,
)
check = compute_serial_ports_in_use()
check = compute_instance_serial_ports_in_use()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"VM Instance {instance.name} have Enable Connecting to Serial Ports off",
f"VM Instance {instance.name} has Enable Connecting to Serial Ports off",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
@@ -113,6 +115,7 @@ class Test_compute_serial_ports_in_use:
metadata={"items": [{"key": "serial-port-enabled", "value": "false"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -124,20 +127,20 @@ class Test_compute_serial_ports_in_use:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use import (
compute_serial_ports_in_use,
from prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use import (
compute_instance_serial_ports_in_use,
)
check = compute_serial_ports_in_use()
check = compute_instance_serial_ports_in_use()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"VM Instance {instance.name} have Enable Connecting to Serial Ports off",
f"VM Instance {instance.name} has Enable Connecting to Serial Ports off",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
@@ -153,6 +156,7 @@ class Test_compute_serial_ports_in_use:
metadata={"items": [{"key": "serial-port-enable", "value": "1"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -164,20 +168,20 @@ class Test_compute_serial_ports_in_use:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use import (
compute_serial_ports_in_use,
from prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use import (
compute_instance_serial_ports_in_use,
)
check = compute_serial_ports_in_use()
check = compute_instance_serial_ports_in_use()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"VM Instance {instance.name} have Enable Connecting to Serial Ports set to on",
f"VM Instance {instance.name} has Enable Connecting to Serial Ports set to on",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
@@ -193,6 +197,7 @@ class Test_compute_serial_ports_in_use:
metadata={"items": [{"key": "serial-port-enable", "value": "true"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -204,20 +209,20 @@ class Test_compute_serial_ports_in_use:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_serial_ports_in_use.compute_serial_ports_in_use import (
compute_serial_ports_in_use,
from prowler.providers.gcp.services.compute.compute_instance_serial_ports_in_use.compute_instance_serial_ports_in_use import (
compute_instance_serial_ports_in_use,
)
check = compute_serial_ports_in_use()
check = compute_instance_serial_ports_in_use()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"VM Instance {instance.name} have Enable Connecting to Serial Ports set to on",
f"VM Instance {instance.name} has Enable Connecting to Serial Ports set to on",
result[0].status_extended,
)
assert result[0].resource_id == instance.id

View File

@@ -4,21 +4,21 @@ from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_shielded_vm_enabled:
class Test_compute_instance_shielded_vm_enabled:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_shielded_vm_enabled.compute_shielded_vm_enabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_shielded_vm_enabled.compute_instance_shielded_vm_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_shielded_vm_enabled.compute_shielded_vm_enabled import (
compute_shielded_vm_enabled,
from prowler.providers.gcp.services.compute.compute_instance_shielded_vm_enabled.compute_instance_shielded_vm_enabled import (
compute_instance_shielded_vm_enabled,
)
check = compute_shielded_vm_enabled()
check = compute_instance_shielded_vm_enabled()
result = check.execute()
assert len(result) == 0
@@ -33,6 +33,7 @@ class Test_compute_shielded_vm_enabled:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -44,20 +45,20 @@ class Test_compute_shielded_vm_enabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_shielded_vm_enabled.compute_shielded_vm_enabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_shielded_vm_enabled.compute_instance_shielded_vm_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_shielded_vm_enabled.compute_shielded_vm_enabled import (
compute_shielded_vm_enabled,
from prowler.providers.gcp.services.compute.compute_instance_shielded_vm_enabled.compute_instance_shielded_vm_enabled import (
compute_instance_shielded_vm_enabled,
)
check = compute_shielded_vm_enabled()
check = compute_instance_shielded_vm_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"VM Instance {instance.name} have vTPM or Integrity Monitoring set to on",
f"VM Instance {instance.name} has vTPM or Integrity Monitoring set to on",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
@@ -73,6 +74,7 @@ class Test_compute_shielded_vm_enabled:
metadata={},
shielded_enabled_vtpm=False,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -84,20 +86,20 @@ class Test_compute_shielded_vm_enabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_shielded_vm_enabled.compute_shielded_vm_enabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_shielded_vm_enabled.compute_instance_shielded_vm_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_shielded_vm_enabled.compute_shielded_vm_enabled import (
compute_shielded_vm_enabled,
from prowler.providers.gcp.services.compute.compute_instance_shielded_vm_enabled.compute_instance_shielded_vm_enabled import (
compute_instance_shielded_vm_enabled,
)
check = compute_shielded_vm_enabled()
check = compute_instance_shielded_vm_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"VM Instance {instance.name} don't have vTPM and Integrity Monitoring set to on",
f"VM Instance {instance.name} doesn't have vTPM and Integrity Monitoring set to on",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
@@ -113,6 +115,7 @@ class Test_compute_shielded_vm_enabled:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=False,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
@@ -124,20 +127,20 @@ class Test_compute_shielded_vm_enabled:
compute_client.instances = [instance]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_shielded_vm_enabled.compute_shielded_vm_enabled.compute_client",
"prowler.providers.gcp.services.compute.compute_instance_shielded_vm_enabled.compute_instance_shielded_vm_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_shielded_vm_enabled.compute_shielded_vm_enabled import (
compute_shielded_vm_enabled,
from prowler.providers.gcp.services.compute.compute_instance_shielded_vm_enabled.compute_instance_shielded_vm_enabled import (
compute_instance_shielded_vm_enabled,
)
check = compute_shielded_vm_enabled()
check = compute_instance_shielded_vm_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"VM Instance {instance.name} don't have vTPM and Integrity Monitoring set to on",
f"VM Instance {instance.name} doesn't have vTPM and Integrity Monitoring set to on",
result[0].status_extended,
)
assert result[0].resource_id == instance.id

View File

@@ -4,20 +4,20 @@ from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_ssh_access_from_the_internet_allowed:
class Test_compute_firewall_ssh_access_from_the_internet_allowed:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.firewalls = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 0
@@ -36,16 +36,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -71,16 +72,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -106,16 +108,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -141,16 +144,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -176,16 +180,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -211,16 +216,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -246,16 +252,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -281,16 +288,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -316,16 +324,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -354,16 +363,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1
@@ -393,16 +403,17 @@ class Test_compute_ssh_access_from_the_internet_allowed:
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.firewalls = [firewall]
compute_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed.compute_client",
"prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_ssh_access_from_the_internet_allowed.compute_ssh_access_from_the_internet_allowed import (
compute_ssh_access_from_the_internet_allowed,
from prowler.providers.gcp.services.compute.compute_firewall_ssh_access_from_the_internet_allowed.compute_firewall_ssh_access_from_the_internet_allowed import (
compute_firewall_ssh_access_from_the_internet_allowed,
)
check = compute_ssh_access_from_the_internet_allowed()
check = compute_firewall_ssh_access_from_the_internet_allowed()
result = check.execute()
assert len(result) == 1

View File

@@ -0,0 +1,99 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_compute_subnet_flow_logs_enabled:
def test_compute_no_subnets(self):
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.subnets = []
with mock.patch(
"prowler.providers.gcp.services.compute.compute_subnet_flow_logs_enabled.compute_subnet_flow_logs_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_subnet_flow_logs_enabled.compute_subnet_flow_logs_enabled import (
compute_subnet_flow_logs_enabled,
)
check = compute_subnet_flow_logs_enabled()
result = check.execute()
assert len(result) == 0
def test_one_compliant_subnet(self):
from prowler.providers.gcp.services.compute.compute_service import Subnet
subnet = Subnet(
name="test",
id="test_id",
project_id=GCP_PROJECT_ID,
flow_logs=True,
network="network",
region="global",
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.subnets = [subnet]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_subnet_flow_logs_enabled.compute_subnet_flow_logs_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_subnet_flow_logs_enabled.compute_subnet_flow_logs_enabled import (
compute_subnet_flow_logs_enabled,
)
check = compute_subnet_flow_logs_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has flow logs enabled",
result[0].status_extended,
)
assert result[0].resource_id == subnet.id
assert result[0].resource_name == subnet.name
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == subnet.region
def test_one_uncompliant_subnet(self):
from prowler.providers.gcp.services.compute.compute_service import Subnet
subnet = Subnet(
name="test",
id="test_id",
project_id=GCP_PROJECT_ID,
flow_logs=False,
network="network",
region="global",
)
compute_client = mock.MagicMock
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.subnets = [subnet]
with mock.patch(
"prowler.providers.gcp.services.compute.compute_subnet_flow_logs_enabled.compute_subnet_flow_logs_enabled.compute_client",
new=compute_client,
):
from prowler.providers.gcp.services.compute.compute_subnet_flow_logs_enabled.compute_subnet_flow_logs_enabled import (
compute_subnet_flow_logs_enabled,
)
check = compute_subnet_flow_logs_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have flow logs enabled",
result[0].status_extended,
)
assert result[0].resource_id == subnet.id
assert result[0].resource_name == subnet.name
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == subnet.region

View File

@@ -63,7 +63,7 @@ class Test_dns_dnssec_disabled:
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Cloud DNS {managed_zone.name} have DNSSEC enabled.",
f"Cloud DNS {managed_zone.name} has DNSSEC enabled.",
result[0].status_extended,
)
assert result[0].resource_id == managed_zone.id
@@ -110,7 +110,7 @@ class Test_dns_dnssec_disabled:
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Cloud DNS {managed_zone.name} doens't have DNSSEC enabled.",
f"Cloud DNS {managed_zone.name} doesn't have DNSSEC enabled.",
result[0].status_extended,
)
assert result[0].resource_id == managed_zone.id

View File

@@ -0,0 +1,62 @@
from re import search
from unittest import mock
from prowler.providers.gcp.services.iam.iam_service import Setting
GCP_PROJECT_ID = "123456789012"
class Test_iam_account_access_approval_enabled:
def test_iam_no_settings(self):
accessapproval_client = mock.MagicMock
accessapproval_client.settings = {}
accessapproval_client.project_ids = [GCP_PROJECT_ID]
accessapproval_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_account_access_approval_enabled.iam_account_access_approval_enabled.accessapproval_client",
new=accessapproval_client,
):
from prowler.providers.gcp.services.iam.iam_account_access_approval_enabled.iam_account_access_approval_enabled import (
iam_account_access_approval_enabled,
)
check = iam_account_access_approval_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have Access Approval enabled",
result[0].status_extended,
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == "global"
def test_iam_project_with_settings(self):
accessapproval_client = mock.MagicMock
accessapproval_client.settings = {
GCP_PROJECT_ID: Setting(name="test", project_id=GCP_PROJECT_ID)
}
accessapproval_client.project_ids = [GCP_PROJECT_ID]
accessapproval_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_account_access_approval_enabled.iam_account_access_approval_enabled.accessapproval_client",
new=accessapproval_client,
):
from prowler.providers.gcp.services.iam.iam_account_access_approval_enabled.iam_account_access_approval_enabled import (
iam_account_access_approval_enabled,
)
check = iam_account_access_approval_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has Access Approval enabled",
result[0].status_extended,
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == "global"

View File

@@ -0,0 +1,92 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_iam_audit_logs_enabled:
def test_iam_no_projects(self):
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.projects = []
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
iam_audit_logs_enabled,
)
check = iam_audit_logs_enabled()
result = check.execute()
assert len(result) == 0
def test_compliant_project(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Project,
)
project1 = Project(id=GCP_PROJECT_ID, audit_logging=True)
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.projects = [project1]
cloudresourcemanager_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
iam_audit_logs_enabled,
)
check = iam_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Audit Logs are enabled for project",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_uncompliant_project(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Project,
)
project1 = Project(id=GCP_PROJECT_ID, audit_logging=False)
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.projects = [project1]
cloudresourcemanager_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
iam_audit_logs_enabled,
)
check = iam_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "FAIL"
assert search(
"Audit Logs are not enabled for project",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region

View File

@@ -0,0 +1,81 @@
from re import search
from unittest import mock
from prowler.providers.gcp.services.iam.iam_service import Organization
GCP_PROJECT_ID = "123456789012"
class Test_iam_organization_essential_contacts_configured:
def test_iam_no_organizations(self):
essentialcontacts_client = mock.MagicMock
essentialcontacts_client.organizations = []
essentialcontacts_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_organization_essential_contacts_configured.iam_organization_essential_contacts_configured.essentialcontacts_client",
new=essentialcontacts_client,
):
from prowler.providers.gcp.services.iam.iam_organization_essential_contacts_configured.iam_organization_essential_contacts_configured import (
iam_organization_essential_contacts_configured,
)
check = iam_organization_essential_contacts_configured()
result = check.execute()
assert len(result) == 0
def test_iam_org_with_contacts(self):
essentialcontacts_client = mock.MagicMock
essentialcontacts_client.organizations = [
Organization(id="test_id", name="test", contacts=True)
]
essentialcontacts_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_organization_essential_contacts_configured.iam_organization_essential_contacts_configured.essentialcontacts_client",
new=essentialcontacts_client,
):
from prowler.providers.gcp.services.iam.iam_organization_essential_contacts_configured.iam_organization_essential_contacts_configured import (
iam_organization_essential_contacts_configured,
)
check = iam_organization_essential_contacts_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has essential contacts configured",
result[0].status_extended,
)
assert result[0].resource_id == "test_id"
assert result[0].resource_name == "test"
assert result[0].project_id == "test_id"
assert result[0].location == "global"
def test_iam_org_without_contacts(self):
essentialcontacts_client = mock.MagicMock
essentialcontacts_client.organizations = [
Organization(id="test_id", name="test", contacts=False)
]
essentialcontacts_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_organization_essential_contacts_configured.iam_organization_essential_contacts_configured.essentialcontacts_client",
new=essentialcontacts_client,
):
from prowler.providers.gcp.services.iam.iam_organization_essential_contacts_configured.iam_organization_essential_contacts_configured import (
iam_organization_essential_contacts_configured,
)
check = iam_organization_essential_contacts_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"does not have essential contacts configured",
result[0].status_extended,
)
assert result[0].resource_id == "test_id"
assert result[0].resource_name == "test"
assert result[0].project_id == "test_id"
assert result[0].location == "global"

View File

@@ -0,0 +1,129 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_iam_role_kms_enforce_separation_of_duties:
def test_iam_no_bindings(self):
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.bindings = []
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
iam_role_kms_enforce_separation_of_duties,
)
check = iam_role_kms_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Principle of separation of duties was enforced for KMS-Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_three_compliant_binding(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding1 = Binding(
role="roles/cloudfunctions.serviceAgent",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding2 = Binding(
role="roles/compute.serviceAgent",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding3 = Binding(
role="roles/connectors.managedZoneViewer",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
cloudresourcemanager_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
iam_role_kms_enforce_separation_of_duties,
)
check = iam_role_kms_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Principle of separation of duties was enforced for KMS-Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_uncompliant_binding(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding1 = Binding(
role="roles/cloudkms.admin",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding2 = Binding(
role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding3 = Binding(
role="roles/connectors.managedZoneViewer",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
cloudresourcemanager_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
iam_role_kms_enforce_separation_of_duties,
)
check = iam_role_kms_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "FAIL"
assert search(
"Principle of separation of duties was not enforced for KMS-Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region

View File

@@ -0,0 +1,129 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
class Test_iam_role_sa_enforce_separation_of_duties:
def test_iam_no_bindings(self):
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.bindings = []
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
iam_role_sa_enforce_separation_of_duties,
)
check = iam_role_sa_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Principle of separation of duties was enforced for Service-Account Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_three_compliant_binding(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding1 = Binding(
role="roles/cloudfunctions.serviceAgent",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding2 = Binding(
role="roles/compute.serviceAgent",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding3 = Binding(
role="roles/connectors.managedZoneViewer",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
cloudresourcemanager_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
iam_role_sa_enforce_separation_of_duties,
)
check = iam_role_sa_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Principle of separation of duties was enforced for Service-Account Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_one_uncompliant_binding(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding1 = Binding(
role="roles/iam.serviceAccountUser",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding2 = Binding(
role="roles/compute.serviceAgent",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding3 = Binding(
role="roles/connectors.managedZoneViewer",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
cloudresourcemanager_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
):
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
iam_role_sa_enforce_separation_of_duties,
)
check = iam_role_sa_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "FAIL"
assert search(
"Principle of separation of duties was not enforced for Service-Account Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region

View File

@@ -0,0 +1,70 @@
from re import search
from unittest import mock
from prowler.providers.gcp.services.serviceusage.serviceusage_service import Service
GCP_PROJECT_ID = "123456789012"
class Test_serviceusage_cloudasset_inventory_enabled:
def test_serviceusage_no_active_services(self):
serviceusage_client = mock.MagicMock
serviceusage_client.active_services = {}
serviceusage_client.project_ids = [GCP_PROJECT_ID]
serviceusage_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.serviceusage.serviceusage_cloudasset_inventory_enabled.serviceusage_cloudasset_inventory_enabled.serviceusage_client",
new=serviceusage_client,
):
from prowler.providers.gcp.services.serviceusage.serviceusage_cloudasset_inventory_enabled.serviceusage_cloudasset_inventory_enabled import (
serviceusage_cloudasset_inventory_enabled,
)
check = serviceusage_cloudasset_inventory_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Cloud Asset Inventory is not enabled in project {GCP_PROJECT_ID}",
result[0].status_extended,
)
assert result[0].resource_id == "cloudasset.googleapis.com"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].resource_name == "Cloud Asset Inventory"
assert result[0].location == serviceusage_client.region
def test_serviceusage_active_cloudasset(self):
serviceusage_client = mock.MagicMock
serviceusage_client.active_services = {
GCP_PROJECT_ID: [
Service(
name="cloudasset.googleapis.com",
title="Cloud Asset Inventory",
project_id=GCP_PROJECT_ID,
)
]
}
serviceusage_client.project_ids = [GCP_PROJECT_ID]
serviceusage_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.serviceusage.serviceusage_cloudasset_inventory_enabled.serviceusage_cloudasset_inventory_enabled.serviceusage_client",
new=serviceusage_client,
):
from prowler.providers.gcp.services.serviceusage.serviceusage_cloudasset_inventory_enabled.serviceusage_cloudasset_inventory_enabled import (
serviceusage_cloudasset_inventory_enabled,
)
check = serviceusage_cloudasset_inventory_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Cloud Asset Inventory is enabled in project {GCP_PROJECT_ID}",
result[0].status_extended,
)
assert result[0].resource_id == "cloudasset.googleapis.com"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].resource_name == "Cloud Asset Inventory"
assert result[0].location == serviceusage_client.region