feat(gcp): add 3 new checks for GKE CIS (#3440)

This commit is contained in:
Sergio Garcia
2024-02-27 18:05:21 +01:00
committed by GitHub
parent 354677bc7a
commit 5ee1e0a9eb
49 changed files with 661 additions and 52 deletions

View File

@@ -0,0 +1,33 @@
{
"Provider": "gcp",
"CheckID": "artifacts_container_analysis_enabled",
"CheckTitle": "Ensure Image Vulnerability Analysis using AR Container Analysis or a third-party provider",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "Artifact Registry",
"SubServiceName": "Container Analysis",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Service",
"Description": "Scan images stored in Google Container Registry (GCR) for vulnerabilities using AR Container Analysis or a third-party provider. This helps identify and mitigate security risks associated with known vulnerabilities in container images.",
"Risk": "Without image vulnerability scanning, container images stored in Artifact Registry may contain known vulnerabilities, increasing the risk of exploitation by malicious actors.",
"RelatedUrl": "https://cloud.google.com/artifact-analysis/docs",
"Remediation": {
"Code": {
"CLI": "gcloud services enable containeranalysis.googleapis.com",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable vulnerability scanning for images stored in Artifact Registry using AR Container Analysis or a third-party provider.",
"Url": "https://cloud.google.com/artifact-analysis/docs/container-scanning-overview"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "By default, AR Container Analysis is disabled."
}

View File

@@ -0,0 +1,31 @@
from prowler.lib.check.models import Check, Check_Report_GCP
from prowler.providers.gcp.services.serviceusage.serviceusage_client import (
serviceusage_client,
)
class artifacts_container_analysis_enabled(Check):
def execute(self) -> Check_Report_GCP:
findings = []
for project_id in serviceusage_client.project_ids:
report = Check_Report_GCP(self.metadata())
report.project_id = project_id
report.resource_id = "containeranalysis.googleapis.com"
report.resource_name = "AR Container Analysis"
report.location = serviceusage_client.region
report.status = "FAIL"
report.status_extended = (
f"AR Container Analysis is not enabled in project {project_id}."
)
for active_service in serviceusage_client.active_services.get(
project_id, []
):
if active_service.name == "containeranalysis.googleapis.com":
report.status = "PASS"
report.status_extended = (
f"AR Container Analysis is enabled in project {project_id}."
)
break
findings.append(report)
return findings

View File

@@ -0,0 +1,33 @@
{
"Provider": "gcp",
"CheckID": "gcr_container_scanning_enabled",
"CheckTitle": "Ensure Image Vulnerability Scanning using GCR Container Scanning or a third-party provider",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "Google Container Registry (GCR)",
"SubServiceName": "Container Scanning",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Service",
"Description": "Scan images stored in Google Container Registry (GCR) for vulnerabilities using GCR Container Scanning or a third-party provider. This helps identify and mitigate security risks associated with known vulnerabilities in container images.",
"Risk": "Without image vulnerability scanning, container images stored in GCR may contain known vulnerabilities, increasing the risk of exploitation by malicious actors.",
"RelatedUrl": "https://cloud.google.com/container-registry/docs/container-analysis",
"Remediation": {
"Code": {
"CLI": "gcloud services enable containerscanning.googleapis.com",
"NativeIaC": "",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/ensure-gcp-gcr-container-vulnerability-scanning-is-enabled#terraform"
},
"Recommendation": {
"Text": "Enable vulnerability scanning for images stored in GCR using GCR Container Scanning or a third-party provider.",
"Url": "https://cloud.google.com/container-registry/docs/container-best-practices"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "By default, GCR Container Scanning is disabled."
}

View File

@@ -0,0 +1,31 @@
from prowler.lib.check.models import Check, Check_Report_GCP
from prowler.providers.gcp.services.serviceusage.serviceusage_client import (
serviceusage_client,
)
class gcr_container_scanning_enabled(Check):
def execute(self) -> Check_Report_GCP:
findings = []
for project_id in serviceusage_client.project_ids:
report = Check_Report_GCP(self.metadata())
report.project_id = project_id
report.resource_id = "containerscanning.googleapis.com"
report.resource_name = "GCR Container Scanning"
report.location = serviceusage_client.region
report.status = "FAIL"
report.status_extended = (
f"GCR Container Scanning is not enabled in project {project_id}."
)
for active_service in serviceusage_client.active_services.get(
project_id, []
):
if active_service.name == "containerscanning.googleapis.com":
report.status = "PASS"
report.status_extended = (
f"GCR Container Scanning is enabled in project {project_id}."
)
break
findings.append(report)
return findings

View File

@@ -0,0 +1,4 @@
from prowler.providers.gcp.lib.audit_info.audit_info import gcp_audit_info
from prowler.providers.gcp.services.gke.gke_service import GKE
gke_client = GKE(gcp_audit_info)

View File

@@ -0,0 +1,33 @@
{
"Provider": "gcp",
"CheckID": "gke_cluster_no_default_service_account",
"CheckTitle": "Ensure GKE clusters are not running using the Compute Engine default service account",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "Google Kubernetes Engine (GKE)",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Service",
"Description": "Ensure GKE clusters are not running using the Compute Engine default service account. Create and use minimally privileged service accounts for GKE cluster nodes instead of using the Compute Engine default service account to minimize unnecessary permissions.",
"Risk": "Using the Compute Engine default service account for GKE cluster nodes may grant excessive permissions, increasing the risk of unauthorized access or compromise if a node is compromised.",
"RelatedUrl": "https://cloud.google.com/compute/docs/access/service-accounts#default_service_account",
"Remediation": {
"Code": {
"CLI": "gcloud container node-pools create [NODE_POOL] --service-account=[SA_NAME]@[PROJECT_ID].iam.gserviceaccount.com --cluster=[CLUSTER_NAME] --zone [COMPUTE_ZONE]",
"NativeIaC": "",
"Other": "",
"Terraform": "https://docs.bridgecrew.io/docs/ensure-gke-clusters-are-not-running-using-the-compute-engine-default-service-account#terraform"
},
"Recommendation": {
"Text": "Create and use minimally privileged service accounts for GKE cluster nodes instead of using the Compute Engine default service account.",
"Url": "https://cloud.google.com/compute/docs/access/service-accounts#default_service_account"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "By default, nodes use the Compute Engine default service account when you create a new cluster."
}

View File

@@ -0,0 +1,26 @@
from prowler.lib.check.models import Check, Check_Report_GCP
from prowler.providers.gcp.services.gke.gke_client import gke_client
class gke_cluster_no_default_service_account(Check):
def execute(self) -> Check_Report_GCP:
findings = []
for cluster in gke_client.clusters.values():
report = Check_Report_GCP(self.metadata())
report.project_id = cluster.project_id
report.resource_id = cluster.id
report.resource_name = cluster.name
report.location = cluster.location
report.status = "PASS"
report.status_extended = f"GKE cluster {cluster.name} is not using the Compute Engine default service account."
if not cluster.node_pools and cluster.service_account == "default":
report.status = "FAIL"
report.status_extended = f"GKE cluster {cluster.name} is using the Compute Engine default service account."
for node_pool in cluster.node_pools:
if node_pool.service_account == "default":
report.status = "FAIL"
report.status_extended = f"GKE cluster {cluster.name} is using the Compute Engine default service account."
break
findings.append(report)
return findings

View File

@@ -0,0 +1,90 @@
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.gcp.lib.service.service import GCPService
################## GKE
class GKE(GCPService):
def __init__(self, audit_info):
super().__init__("container", audit_info, api_version="v1beta1")
self.locations = []
self.__get_locations__()
self.clusters = {}
self.__threading_call__(self.__get_clusters__, self.locations)
def __get_locations__(self):
for project_id in self.project_ids:
try:
request = (
self.client.projects()
.locations()
.list(parent="projects/" + project_id)
)
response = request.execute()
for location in response["locations"]:
self.locations.append(
Location(name=location["name"], project_id=project_id)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_clusters__(self, location):
try:
request = (
self.client.projects()
.locations()
.clusters()
.list(
parent=f"projects/{location.project_id}/locations/{location.name}"
)
)
response = request.execute(http=self.__get_AuthorizedHttp_client__())
for cluster in response.get("clusters", []):
node_pools = []
for node_pool in cluster["nodePools"]:
node_pools.append(
NodePool(
name=node_pool["name"],
locations=node_pool["locations"],
service_account=node_pool["config"]["serviceAccount"],
project_id=location.project_id,
)
)
self.clusters[cluster["id"]] = Cluster(
name=cluster["name"],
id=cluster["id"],
location=cluster["location"],
service_account=cluster["nodeConfig"]["serviceAccount"],
node_pools=node_pools,
project_id=location.project_id,
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Location(BaseModel):
name: str
project_id: str
class NodePool(BaseModel):
name: str
locations: list
service_account: str
project_id: str
class Cluster(BaseModel):
name: str
id: str
location: str
service_account: str
node_pools: list[NodePool]
project_id: str

View File

@@ -1,10 +1,10 @@
{
"Provider": "gcp",
"CheckID": "serviceusage_cloudasset_inventory_enabled",
"CheckID": "iam_cloud_asset_inventory_enabled",
"CheckTitle": "Ensure Cloud Asset Inventory Is Enabled",
"CheckType": [],
"ServiceName": "serviceusage",
"SubServiceName": "",
"ServiceName": "iam",
"SubServiceName": "Asset Inventory",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Service",

View File

@@ -4,7 +4,7 @@ from prowler.providers.gcp.services.serviceusage.serviceusage_client import (
)
class serviceusage_cloudasset_inventory_enabled(Check):
class iam_cloud_asset_inventory_enabled(Check):
def execute(self) -> Check_Report_GCP:
findings = []
for project_id in serviceusage_client.project_ids:

View File

@@ -0,0 +1 @@
GCP_PROJECT_ID = "123456789012"

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_apikeys_api_restrictions_configured:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_apikeys_key_exists:

View File

@@ -2,7 +2,7 @@ from datetime import datetime, timedelta, timezone
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_apikeys_key_rotated_in_90_days:

View File

@@ -0,0 +1,68 @@
from unittest import mock
from prowler.providers.gcp.services.serviceusage.serviceusage_service import Service
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_artifacts_container_analysis_enabled:
def test_serviceusage_no_active_services(self):
serviceusage_client = mock.MagicMock
serviceusage_client.active_services = {}
serviceusage_client.project_ids = [GCP_PROJECT_ID]
serviceusage_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.artifacts.artifacts_container_analysis_enabled.artifacts_container_analysis_enabled.serviceusage_client",
new=serviceusage_client,
):
from prowler.providers.gcp.services.artifacts.artifacts_container_analysis_enabled.artifacts_container_analysis_enabled import (
artifacts_container_analysis_enabled,
)
check = artifacts_container_analysis_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"AR Container Analysis is not enabled in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == "containeranalysis.googleapis.com"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].resource_name == "AR Container Analysis"
assert result[0].location == serviceusage_client.region
def test_serviceusage_active_cloudasset(self):
serviceusage_client = mock.MagicMock
serviceusage_client.active_services = {
GCP_PROJECT_ID: [
Service(
name="containeranalysis.googleapis.com",
title="AR Container Analysis",
project_id=GCP_PROJECT_ID,
)
]
}
serviceusage_client.project_ids = [GCP_PROJECT_ID]
serviceusage_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.artifacts.artifacts_container_analysis_enabled.artifacts_container_analysis_enabled.serviceusage_client",
new=serviceusage_client,
):
from prowler.providers.gcp.services.artifacts.artifacts_container_analysis_enabled.artifacts_container_analysis_enabled import (
artifacts_container_analysis_enabled,
)
check = artifacts_container_analysis_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"AR Container Analysis is enabled in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == "containeranalysis.googleapis.com"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].resource_name == "AR Container Analysis"
assert result[0].location == serviceusage_client.region

View File

@@ -1,6 +1,6 @@
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_bigquery_dataset_public_access:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_instance_block_project_wide_ssh_keys_disabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_instance_default_service_account_in_use:

View File

@@ -1,6 +1,6 @@
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_instance_default_service_account_in_use_with_full_api_access:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_instance_encryption_with_csek_enabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_instance_confidential_computing_enabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_instance_ip_forwarding_is_enabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_loadbalancer_logging_enabled:

View File

@@ -2,8 +2,7 @@ from re import search
from unittest import mock
from prowler.providers.gcp.services.dns.dns_service import Policy
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_network_dns_logging_enabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_network_not_legacy:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_project_os_login_enabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_firewall_rdp_access_from_the_internet_allowed:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_instance_serial_ports_in_use:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_instance_shielded_vm_enabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_firewall_ssh_access_from_the_internet_allowed:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_compute_subnet_flow_logs_enabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_dataproc_encrypted_with_cmks_disabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_dns_dnssec_disabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_dns_rsasha1_in_use_to_key_sign_in_dnssec:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_dns_rsasha1_in_use_to_zone_sign_in_dnssec:

View File

@@ -0,0 +1,68 @@
from unittest import mock
from prowler.providers.gcp.services.serviceusage.serviceusage_service import Service
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_gcr_container_scanning_enabled:
def test_serviceusage_no_active_services(self):
serviceusage_client = mock.MagicMock
serviceusage_client.active_services = {}
serviceusage_client.project_ids = [GCP_PROJECT_ID]
serviceusage_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.gcr.gcr_container_scanning_enabled.gcr_container_scanning_enabled.serviceusage_client",
new=serviceusage_client,
):
from prowler.providers.gcp.services.gcr.gcr_container_scanning_enabled.gcr_container_scanning_enabled import (
gcr_container_scanning_enabled,
)
check = gcr_container_scanning_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"GCR Container Scanning is not enabled in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == "containerscanning.googleapis.com"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].resource_name == "GCR Container Scanning"
assert result[0].location == serviceusage_client.region
def test_serviceusage_active_cloudasset(self):
serviceusage_client = mock.MagicMock
serviceusage_client.active_services = {
GCP_PROJECT_ID: [
Service(
name="containerscanning.googleapis.com",
title="GCR Container Scanning",
project_id=GCP_PROJECT_ID,
)
]
}
serviceusage_client.project_ids = [GCP_PROJECT_ID]
serviceusage_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.gcr.gcr_container_scanning_enabled.gcr_container_scanning_enabled.serviceusage_client",
new=serviceusage_client,
):
from prowler.providers.gcp.services.gcr.gcr_container_scanning_enabled.gcr_container_scanning_enabled import (
gcr_container_scanning_enabled,
)
check = gcr_container_scanning_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"GCR Container Scanning is enabled in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == "containerscanning.googleapis.com"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].resource_name == "GCR Container Scanning"
assert result[0].location == serviceusage_client.region

View File

@@ -0,0 +1,196 @@
from unittest import mock
from prowler.providers.gcp.services.gke.gke_service import Cluster, NodePool
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_gke_cluster_no_default_service_account:
def test_gke_no_clusters(self):
gke_client = mock.MagicMock
gke_client.clusters = {}
with mock.patch(
"prowler.providers.gcp.services.gke.gke_cluster_no_default_service_account.gke_cluster_no_default_service_account.gke_client",
new=gke_client,
):
from prowler.providers.gcp.services.gke.gke_cluster_no_default_service_account.gke_cluster_no_default_service_account import (
gke_cluster_no_default_service_account,
)
check = gke_cluster_no_default_service_account()
result = check.execute()
assert len(result) == 0
def test_one_cluster_without_node_pool(self):
clusters = {
"123": Cluster(
name="test",
id="123",
location="eu-west-1",
service_account="default",
node_pools=[],
project_id=GCP_PROJECT_ID,
)
}
gke_client = mock.MagicMock
gke_client.project_ids = [GCP_PROJECT_ID]
gke_client.clusters = clusters
gke_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.gke.gke_cluster_no_default_service_account.gke_cluster_no_default_service_account.gke_client",
new=gke_client,
):
from prowler.providers.gcp.services.gke.gke_cluster_no_default_service_account.gke_cluster_no_default_service_account import (
gke_cluster_no_default_service_account,
)
check = gke_cluster_no_default_service_account()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"GKE cluster {clusters['123'].name} is using the Compute Engine default service account."
)
assert result[0].project_id == clusters["123"].project_id
assert result[0].resource_id == clusters["123"].id
assert result[0].resource_name == clusters["123"].name
assert result[0].location == clusters["123"].location
def test_one_cluster_without_node_pool_without_default_sa(self):
clusters = {
"123": Cluster(
name="test",
id="123",
location="eu-west-1",
service_account="1231231231",
node_pools=[],
project_id=GCP_PROJECT_ID,
)
}
gke_client = mock.MagicMock
gke_client.project_ids = [GCP_PROJECT_ID]
gke_client.clusters = clusters
gke_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.gke.gke_cluster_no_default_service_account.gke_cluster_no_default_service_account.gke_client",
new=gke_client,
):
from prowler.providers.gcp.services.gke.gke_cluster_no_default_service_account.gke_cluster_no_default_service_account import (
gke_cluster_no_default_service_account,
)
check = gke_cluster_no_default_service_account()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"GKE cluster {clusters['123'].name} is not using the Compute Engine default service account."
)
assert result[0].project_id == clusters["123"].project_id
assert result[0].resource_id == clusters["123"].id
assert result[0].resource_name == clusters["123"].name
assert result[0].location == clusters["123"].location
def test_one_cluster_with_node_pool_with_default_sa(self):
clusters = {
"123": Cluster(
name="test",
id="123",
location="eu-west-1",
service_account="default",
node_pools=[
NodePool(
name="test",
locations=["eu-west-1"],
service_account="default",
project_id=GCP_PROJECT_ID,
)
],
project_id=GCP_PROJECT_ID,
)
}
gke_client = mock.MagicMock
gke_client.project_ids = [GCP_PROJECT_ID]
gke_client.clusters = clusters
gke_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.gke.gke_cluster_no_default_service_account.gke_cluster_no_default_service_account.gke_client",
new=gke_client,
):
from prowler.providers.gcp.services.gke.gke_cluster_no_default_service_account.gke_cluster_no_default_service_account import (
gke_cluster_no_default_service_account,
)
check = gke_cluster_no_default_service_account()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"GKE cluster {clusters['123'].name} is using the Compute Engine default service account."
)
assert result[0].project_id == clusters["123"].project_id
assert result[0].resource_id == clusters["123"].id
assert result[0].resource_name == clusters["123"].name
assert result[0].location == clusters["123"].location
def test_one_cluster_with_node_pool_with_non_default_sa(self):
clusters = {
"123": Cluster(
name="test",
id="123",
location="eu-west-1",
service_account="default",
node_pools=[
NodePool(
name="test",
locations=["eu-west-1"],
service_account="123123123",
project_id=GCP_PROJECT_ID,
)
],
project_id=GCP_PROJECT_ID,
)
}
gke_client = mock.MagicMock
gke_client.project_ids = [GCP_PROJECT_ID]
gke_client.clusters = clusters
gke_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.gke.gke_cluster_no_default_service_account.gke_cluster_no_default_service_account.gke_client",
new=gke_client,
):
from prowler.providers.gcp.services.gke.gke_cluster_no_default_service_account.gke_cluster_no_default_service_account import (
gke_cluster_no_default_service_account,
)
check = gke_cluster_no_default_service_account()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"GKE cluster {clusters['123'].name} is not using the Compute Engine default service account."
)
assert result[0].project_id == clusters["123"].project_id
assert result[0].resource_id == clusters["123"].id
assert result[0].resource_name == clusters["123"].name
assert result[0].location == clusters["123"].location

View File

@@ -2,8 +2,7 @@ from re import search
from unittest import mock
from prowler.providers.gcp.services.iam.iam_service import Setting
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_iam_account_access_approval_enabled:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_iam_audit_logs_enabled:

View File

@@ -1,4 +1,3 @@
from re import search
from unittest import mock
from prowler.providers.gcp.services.serviceusage.serviceusage_service import Service
@@ -6,7 +5,7 @@ from prowler.providers.gcp.services.serviceusage.serviceusage_service import Ser
GCP_PROJECT_ID = "123456789012"
class Test_serviceusage_cloudasset_inventory_enabled:
class Test_iam_cloud_asset_inventory_enabled:
def test_serviceusage_no_active_services(self):
serviceusage_client = mock.MagicMock
serviceusage_client.active_services = {}
@@ -14,20 +13,20 @@ class Test_serviceusage_cloudasset_inventory_enabled:
serviceusage_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.serviceusage.serviceusage_cloudasset_inventory_enabled.serviceusage_cloudasset_inventory_enabled.serviceusage_client",
"prowler.providers.gcp.services.iam.iam_cloud_asset_inventory_enabled.iam_cloud_asset_inventory_enabled.serviceusage_client",
new=serviceusage_client,
):
from prowler.providers.gcp.services.serviceusage.serviceusage_cloudasset_inventory_enabled.serviceusage_cloudasset_inventory_enabled import (
serviceusage_cloudasset_inventory_enabled,
from prowler.providers.gcp.services.iam.iam_cloud_asset_inventory_enabled.iam_cloud_asset_inventory_enabled import (
iam_cloud_asset_inventory_enabled,
)
check = serviceusage_cloudasset_inventory_enabled()
check = iam_cloud_asset_inventory_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Cloud Asset Inventory is not enabled in project {GCP_PROJECT_ID}",
result[0].status_extended,
assert (
result[0].status_extended
== f"Cloud Asset Inventory is not enabled in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == "cloudasset.googleapis.com"
assert result[0].project_id == GCP_PROJECT_ID
@@ -49,20 +48,20 @@ class Test_serviceusage_cloudasset_inventory_enabled:
serviceusage_client.region = "global"
with mock.patch(
"prowler.providers.gcp.services.serviceusage.serviceusage_cloudasset_inventory_enabled.serviceusage_cloudasset_inventory_enabled.serviceusage_client",
"prowler.providers.gcp.services.iam.iam_cloud_asset_inventory_enabled.iam_cloud_asset_inventory_enabled.serviceusage_client",
new=serviceusage_client,
):
from prowler.providers.gcp.services.serviceusage.serviceusage_cloudasset_inventory_enabled.serviceusage_cloudasset_inventory_enabled import (
serviceusage_cloudasset_inventory_enabled,
from prowler.providers.gcp.services.iam.iam_cloud_asset_inventory_enabled.iam_cloud_asset_inventory_enabled import (
iam_cloud_asset_inventory_enabled,
)
check = serviceusage_cloudasset_inventory_enabled()
check = iam_cloud_asset_inventory_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Cloud Asset Inventory is enabled in project {GCP_PROJECT_ID}",
result[0].status_extended,
assert (
result[0].status_extended
== f"Cloud Asset Inventory is enabled in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == "cloudasset.googleapis.com"
assert result[0].project_id == GCP_PROJECT_ID

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_iam_no_service_roles_at_project_level:

View File

@@ -3,8 +3,6 @@ from unittest import mock
from prowler.providers.gcp.services.iam.iam_service import Organization
GCP_PROJECT_ID = "123456789012"
class Test_iam_organization_essential_contacts_configured:
def test_iam_no_organizations(self):

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_iam_role_kms_enforce_separation_of_duties:

View File

@@ -1,7 +1,7 @@
from re import search
from unittest import mock
GCP_PROJECT_ID = "123456789012"
from tests.providers.gcp.lib.audit_info_utils import GCP_PROJECT_ID
class Test_iam_role_sa_enforce_separation_of_duties: