feat(gcp): add --project-ids flag and scan all projects by default (#2393)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2023-06-06 11:56:39 +02:00
committed by GitHub
parent 3c7580f024
commit a8f03d859c
77 changed files with 855 additions and 650 deletions

View File

@@ -184,7 +184,7 @@ Those credentials must be associated to a user or service account with proper pe
- Security Reviewer
- Stackdriver Account Viewer
> `prowler` will scan the project associated with the credentials.
> By default, `prowler` will scan all accessible GCP Projects, use flag `--project-ids` to specify the projects to be scanned.
# 💻 Basic Usage
@@ -267,7 +267,7 @@ Optionally, you can provide the location of an application credential JSON file
```console
prowler gcp --credentials-file path
```
> By default, `prowler` will scan all accessible GCP Projects, use flag `--project-ids` to specify the projects to be scanned.
# 📃 License

View File

@@ -96,4 +96,4 @@ Those credentials must be associated to a user or service account with proper pe
- Security Reviewer
- Stackdriver Account Viewer
> `prowler` will scan the project associated with the credentials.
> By default, `prowler` will scan all accessible GCP Projects, use flag `--project-ids` to specify the projects to be scanned.

View File

@@ -276,7 +276,7 @@ prowler azure --managed-identity-auth
See more details about Azure Authentication in [Requirements](getting-started/requirements.md)
Prowler by default scans all the subscriptions that is allowed to scan, if you want to scan a single subscription or various concrete subscriptions you can use the following flag (using az cli auth as example):
Prowler by default scans all the subscriptions that is allowed to scan, if you want to scan a single subscription or various specific subscriptions you can use the following flag (using az cli auth as example):
```console
prowler azure --az-cli-auth --subscription-ids <subscription ID 1> <subscription ID 2> ... <subscription ID N>
```
@@ -296,6 +296,9 @@ Otherwise, you can generate and download Service Account keys in JSON format (re
prowler gcp --credentials-file path
```
> `prowler` will scan the GCP project associated with the credentials.
Prowler by default scans all the GCP Projects that is allowed to scan, if you want to scan a single project or various specific projects you can use the following flag:
```console
prowler gcp --project-ids <Project ID 1> <Project ID 2> ... <Project ID N>
```
See more details about GCP Authentication in [Requirements](getting-started/requirements.md)

View File

@@ -443,7 +443,7 @@ Detailed documentation at https://docs.prowler.cloud
"--subscription-ids",
nargs="+",
default=[],
help="Azure subscription ids to be scanned by prowler",
help="Azure Subscription IDs to be scanned by Prowler",
)
azure_parser.add_argument(
"--tenant-id",
@@ -466,3 +466,11 @@ Detailed documentation at https://docs.prowler.cloud
metavar="FILE_PATH",
help="Authenticate using a Google Service Account Application Credentials JSON file",
)
# Subscriptions
gcp_subscriptions_subparser = gcp_parser.add_argument_group("Projects")
gcp_subscriptions_subparser.add_argument(
"--project-ids",
nargs="+",
default=[],
help="GCP Project IDs to be scanned by Prowler",
)

View File

@@ -491,8 +491,8 @@ def get_gcp_html_assessment_summary(audit_info):
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>GCP Project ID:</b> """
+ audit_info.project_id
<b>GCP Project IDs:</b> """
+ ", ".join(audit_info.project_ids)
+ """
</li>
</ul>

View File

@@ -363,7 +363,7 @@ def generate_provider_output_json(
)
if provider == "gcp":
finding_output.ProjectId = audit_info.project_id
finding_output.ProjectId = finding.project_id
finding_output.Location = finding.location
finding_output.ResourceId = finding.resource_id
finding_output.ResourceName = finding.resource_name

View File

@@ -30,7 +30,7 @@ def create_message_identity(provider, audit_info):
if provider == "aws":
identity = f"AWS Account *{audit_info.audited_account}*"
elif provider == "gcp":
identity = f"GCP Project *{audit_info.project_id}*"
identity = f"GCP Projects *{', '.join(audit_info.project_ids)}*"
logo = gcp_logo
elif provider == "azure":
printed_subscriptions = []

View File

@@ -30,8 +30,8 @@ def display_summary_table(
entity_type = "Tenant ID/s"
audited_entities = " ".join(audit_info.identity.tenant_ids)
elif provider == "gcp":
entity_type = "Project ID"
audited_entities = audit_info.project_id
entity_type = "Project ID/s"
audited_entities = ", ".join(audit_info.project_ids)
if findings:
current = {

View File

@@ -50,7 +50,7 @@ class Audit_Info:
report = f"""
This report is being generated using credentials below:
GCP Account: {Fore.YELLOW}[{profile}]{Style.RESET_ALL} GCP Project ID: {Fore.YELLOW}[{audit_info.project_id}]{Style.RESET_ALL}
GCP Account: {Fore.YELLOW}[{profile}]{Style.RESET_ALL} GCP Project IDs: {Fore.YELLOW}[{", ".join(audit_info.project_ids)}]{Style.RESET_ALL}
"""
print(report)
@@ -301,17 +301,20 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE
set_gcp_audit_info returns the GCP_Audit_Info
"""
logger.info("Setting GCP session ...")
project_ids = arguments.get("project_ids")
logger.info("Checking if any credentials mode is set ...")
credentials_file = arguments.get("credentials_file")
gcp_provider = GCP_Provider(
credentials_file,
project_ids,
)
(
gcp_audit_info.credentials,
gcp_audit_info.project_id,
gcp_audit_info.default_project_id,
gcp_audit_info.project_ids,
) = gcp_provider.get_credentials()
if not arguments.get("only_logs"):

View File

@@ -86,9 +86,7 @@ class Gcp_Output_Options(Provider_Output_Options):
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
self.output_filename = (
f"prowler-output-{audit_info.project_id}-{output_file_timestamp}"
)
self.output_filename = f"prowler-output-{audit_info.default_project_id}-{output_file_timestamp}"
else:
self.output_filename = arguments.output_filename

View File

@@ -13,13 +13,35 @@ class GCP_Provider:
def __init__(
self,
credentials_file: str,
input_project_ids: list,
):
logger.info("Instantiating GCP Provider ...")
self.credentials, self.project_id = self.__set_credentials__(credentials_file)
if not self.project_id:
self.credentials, self.default_project_id = self.__set_credentials__(
credentials_file
)
if not self.default_project_id:
logger.critical("No Project ID associated to Google Credentials.")
sys.exit(1)
self.project_ids = []
accessible_projects = self.get_project_ids()
if not accessible_projects:
logger.critical("No Project IDs can be accessed via Google Credentials.")
sys.exit(1)
if input_project_ids:
for input_project in input_project_ids:
if input_project in accessible_projects:
self.project_ids.append(input_project)
else:
logger.critical(
f"Project {input_project} cannot be accessed via Google Credentials."
)
sys.exit(1)
else:
# If not projects were input, all accessible projects are scanned by default
self.project_ids = accessible_projects
def __set_credentials__(self, credentials_file):
try:
if credentials_file:
@@ -27,7 +49,9 @@ class GCP_Provider:
return auth.default()
except Exception as error:
logger.critical(f"{error.__class__.__name__} -- {error}")
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
def __set_gcp_creds_env_var__(self, credentials_file):
@@ -38,7 +62,34 @@ class GCP_Provider:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = client_secrets_path
def get_credentials(self):
return self.credentials, self.project_id
return self.credentials, self.default_project_id, self.project_ids
def get_project_ids(self):
try:
project_ids = []
service = discovery.build(
"cloudresourcemanager", "v1", credentials=self.credentials
)
request = service.projects().list()
while request is not None:
response = request.execute()
for project in response.get("projects", []):
project_ids.append(project["projectId"])
request = service.projects().list_next(
previous_request=request, previous_response=response
)
return project_ids
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return []
def generate_client(

View File

@@ -2,7 +2,8 @@ from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info
gcp_audit_info = GCP_Audit_Info(
credentials=None,
project_id=None,
default_project_id=None,
project_ids=[],
audit_resources=None,
audit_metadata=None,
)

View File

@@ -7,12 +7,21 @@ from google.oauth2.credentials import Credentials
@dataclass
class GCP_Audit_Info:
credentials: Credentials
project_id: str
default_project_id: str
project_ids: list
audit_resources: Optional[Any]
audit_metadata: Optional[Any]
def __init__(self, credentials, project_id, audit_metadata, audit_resources):
def __init__(
self,
credentials,
default_project_id,
project_ids,
audit_metadata,
audit_resources,
):
self.credentials = credentials
self.project_id = project_id
self.default_project_id = default_project_id
self.project_ids = project_ids
self.audit_metadata = audit_metadata
self.audit_resources = audit_resources

View File

@@ -7,7 +7,7 @@ class bigquery_dataset_cmk_encryption(Check):
findings = []
for dataset in bigquery_client.datasets:
report = Check_Report_GCP(self.metadata())
report.project_id = bigquery_client.project_id
report.project_id = dataset.project_id
report.resource_id = dataset.id
report.resource_name = dataset.name
report.location = dataset.region

View File

@@ -7,7 +7,7 @@ class bigquery_dataset_public_access(Check):
findings = []
for dataset in bigquery_client.datasets:
report = Check_Report_GCP(self.metadata())
report.project_id = bigquery_client.project_id
report.project_id = dataset.project_id
report.resource_id = dataset.id
report.resource_name = dataset.name
report.location = dataset.region

View File

@@ -9,7 +9,7 @@ class BigQuery:
def __init__(self, audit_info):
self.service = "bigquery"
self.api_version = "v2"
self.project_id = audit_info.project_id
self.project_ids = audit_info.project_ids
self.client = generate_client(self.service, self.api_version, audit_info)
self.datasets = []
self.tables = []
@@ -17,52 +17,54 @@ class BigQuery:
self.__get_tables__()
def __get_datasets__(self):
try:
request = self.client.datasets().list(projectId=self.project_id)
while request is not None:
response = request.execute()
for project_id in self.project_ids:
try:
request = self.client.datasets().list(projectId=project_id)
while request is not None:
response = request.execute()
for dataset in response.get("datasets", []):
dataset_info = (
self.client.datasets()
.get(
projectId=self.project_id,
datasetId=dataset["datasetReference"]["datasetId"],
for dataset in response.get("datasets", []):
dataset_info = (
self.client.datasets()
.get(
projectId=project_id,
datasetId=dataset["datasetReference"]["datasetId"],
)
.execute()
)
.execute()
)
cmk_encryption = False
public = False
roles = dataset_info.get("access", "")
if "allAuthenticatedUsers" in str(roles) or "allUsers" in str(
roles
):
public = True
if dataset_info.get("defaultEncryptionConfiguration"):
cmk_encryption = True
self.datasets.append(
Dataset(
name=dataset["datasetReference"]["datasetId"],
id=dataset["id"],
region=dataset["location"],
cmk_encryption=cmk_encryption,
public=public,
cmk_encryption = False
public = False
roles = dataset_info.get("access", "")
if "allAuthenticatedUsers" in str(roles) or "allUsers" in str(
roles
):
public = True
if dataset_info.get("defaultEncryptionConfiguration"):
cmk_encryption = True
self.datasets.append(
Dataset(
name=dataset["datasetReference"]["datasetId"],
id=dataset["id"],
region=dataset["location"],
cmk_encryption=cmk_encryption,
public=public,
project_id=project_id,
)
)
)
request = self.client.datasets().list_next(
previous_request=request, previous_response=response
request = self.client.datasets().list_next(
previous_request=request, previous_response=response
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_tables__(self):
try:
for dataset in self.datasets:
for dataset in self.datasets:
try:
request = self.client.tables().list(
projectId=self.project_id, datasetId=dataset.name
projectId=dataset.project_id, datasetId=dataset.name
)
while request is not None:
response = request.execute()
@@ -72,7 +74,7 @@ class BigQuery:
if (
self.client.tables()
.get(
projectId=self.project_id,
projectId=dataset.project_id,
datasetId=dataset.name,
tableId=table["tableReference"]["tableId"],
)
@@ -86,16 +88,17 @@ class BigQuery:
id=table["id"],
region=dataset.region,
cmk_encryption=cmk_encryption,
project_id=dataset.project_id,
)
)
request = self.client.tables().list_next(
previous_request=request, previous_response=response
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Dataset(BaseModel):
@@ -104,6 +107,7 @@ class Dataset(BaseModel):
region: str
cmk_encryption: bool
public: bool
project_id: str
class Table(BaseModel):
@@ -111,3 +115,4 @@ class Table(BaseModel):
id: str
region: str
cmk_encryption: bool
project_id: str

View File

@@ -7,7 +7,7 @@ class bigquery_table_cmk_encryption(Check):
findings = []
for table in bigquery_client.tables:
report = Check_Report_GCP(self.metadata())
report.project_id = bigquery_client.project_id
report.project_id = table.project_id
report.resource_id = table.id
report.resource_name = table.name
report.location = table.region

View File

@@ -10,7 +10,7 @@ class CloudResourceManager:
self.service = "cloudresourcemanager"
self.api_version = "v1"
self.region = "global"
self.project_id = audit_info.project_id
self.project_ids = audit_info.project_ids
self.client = generate_client(self.service, self.api_version, audit_info)
self.bindings = []
self.__get_iam_policy__()
@@ -19,23 +19,26 @@ class CloudResourceManager:
return self.client
def __get_iam_policy__(self):
try:
policy = (
self.client.projects().getIamPolicy(resource=self.project_id).execute()
)
for binding in policy["bindings"]:
self.bindings.append(
Binding(
role=binding["role"],
members=binding["members"],
)
for project_id in self.project_ids:
try:
policy = (
self.client.projects().getIamPolicy(resource=project_id).execute()
)
for binding in policy["bindings"]:
self.bindings.append(
Binding(
role=binding["role"],
members=binding["members"],
project_id=project_id,
)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Binding(BaseModel):
role: str
members: list
project_id: str

View File

@@ -7,7 +7,7 @@ class cloudsql_instance_automated_backups(Check):
findings = []
for instance in cloudsql_client.instances:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_mysql_local_infile_flag(Check):
for instance in cloudsql_client.instances:
if "MYSQL" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_mysql_skip_show_database_flag(Check):
for instance in cloudsql_client.instances:
if "MYSQL" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_postgres_enable_pgaudit_flag(Check):
for instance in cloudsql_client.instances:
if "POSTGRES" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_postgres_log_connections_flag(Check):
for instance in cloudsql_client.instances:
if "POSTGRES" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_postgres_log_disconnections_flag(Check):
for instance in cloudsql_client.instances:
if "POSTGRES" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_postgres_log_error_verbosity_flag(Check):
for instance in cloudsql_client.instances:
if "POSTGRES" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_postgres_log_min_duration_statement_flag(Check):
for instance in cloudsql_client.instances:
if "POSTGRES" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -9,7 +9,7 @@ class cloudsql_instance_postgres_log_min_error_statement_flag(Check):
for instance in cloudsql_client.instances:
if "POSTGRES" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -9,7 +9,7 @@ class cloudsql_instance_postgres_log_min_messages_flag(Check):
for instance in cloudsql_client.instances:
if "POSTGRES" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -9,7 +9,7 @@ class cloudsql_instance_postgres_log_statement_flag(Check):
for instance in cloudsql_client.instances:
if "POSTGRES" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -7,7 +7,7 @@ class cloudsql_instance_private_ip_assignment(Check):
findings = []
for instance in cloudsql_client.instances:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -7,7 +7,7 @@ class cloudsql_instance_public_access(Check):
findings = []
for instance in cloudsql_client.instances:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -7,7 +7,7 @@ class cloudsql_instance_public_ip(Check):
findings = []
for instance in cloudsql_client.instances:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_sqlserver_contained_database_authentication_flag(Check):
for instance in cloudsql_client.instances:
if "SQLSERVER" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_sqlserver_cross_db_ownership_chaining_flag(Check):
for instance in cloudsql_client.instances:
if "SQLSERVER" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_sqlserver_external_scripts_enabled_flag(Check):
for instance in cloudsql_client.instances:
if "SQLSERVER" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_sqlserver_remote_access_flag(Check):
for instance in cloudsql_client.instances:
if "SQLSERVER" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_sqlserver_trace_flag(Check):
for instance in cloudsql_client.instances:
if "SQLSERVER" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_sqlserver_user_connections_flag(Check):
for instance in cloudsql_client.instances:
if "SQLSERVER" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -8,7 +8,7 @@ class cloudsql_instance_sqlserver_user_options_flag(Check):
for instance in cloudsql_client.instances:
if "SQLSERVER" in instance.version:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -7,7 +7,7 @@ class cloudsql_instance_ssl_connections(Check):
findings = []
for instance in cloudsql_client.instances:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudsql_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.name
report.resource_name = instance.name
report.location = instance.region

View File

@@ -9,49 +9,51 @@ class CloudSQL:
def __init__(self, audit_info):
self.service = "sqladmin"
self.api_version = "v1"
self.project_id = audit_info.project_id
self.project_ids = audit_info.project_ids
self.client = generate_client(self.service, self.api_version, audit_info)
self.instances = []
self.__get_instances__()
def __get_instances__(self):
try:
request = self.client.instances().list(project=self.project_id)
while request is not None:
response = request.execute()
for project_id in self.project_ids:
try:
request = self.client.instances().list(project=project_id)
while request is not None:
response = request.execute()
for instance in response.get("items", []):
public_ip = False
for address in instance.get("ipAddresses", []):
if address["type"] == "PRIMARY":
public_ip = True
self.instances.append(
Instance(
name=instance["name"],
version=instance["databaseVersion"],
region=instance["region"],
ip_addresses=instance.get("ipAddresses", []),
public_ip=public_ip,
ssl=instance["settings"]["ipConfiguration"].get(
"requireSsl", False
),
automated_backups=instance["settings"][
"backupConfiguration"
]["enabled"],
authorized_networks=instance["settings"]["ipConfiguration"][
"authorizedNetworks"
],
flags=instance["settings"].get("databaseFlags", []),
for instance in response.get("items", []):
public_ip = False
for address in instance.get("ipAddresses", []):
if address["type"] == "PRIMARY":
public_ip = True
self.instances.append(
Instance(
name=instance["name"],
version=instance["databaseVersion"],
region=instance["region"],
ip_addresses=instance.get("ipAddresses", []),
public_ip=public_ip,
ssl=instance["settings"]["ipConfiguration"].get(
"requireSsl", False
),
automated_backups=instance["settings"][
"backupConfiguration"
]["enabled"],
authorized_networks=instance["settings"][
"ipConfiguration"
]["authorizedNetworks"],
flags=instance["settings"].get("databaseFlags", []),
project_id=project_id,
)
)
)
request = self.client.instances().list_next(
previous_request=request, previous_response=response
request = self.client.instances().list_next(
previous_request=request, previous_response=response
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Instance(BaseModel):
@@ -64,3 +66,4 @@ class Instance(BaseModel):
ssl: bool
automated_backups: bool
flags: list
project_id: str

View File

@@ -9,7 +9,7 @@ class cloudstorage_bucket_public_access(Check):
findings = []
for bucket in cloudstorage_client.buckets:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudstorage_client.project_id
report.project_id = bucket.project_id
report.resource_id = bucket.id
report.resource_name = bucket.name
report.location = bucket.region

View File

@@ -9,7 +9,7 @@ class cloudstorage_bucket_uniform_bucket_level_access(Check):
findings = []
for bucket in cloudstorage_client.buckets:
report = Check_Report_GCP(self.metadata())
report.project_id = cloudstorage_client.project_id
report.project_id = bucket.project_id
report.resource_id = bucket.id
report.resource_name = bucket.name
report.location = bucket.region

View File

@@ -9,46 +9,48 @@ class CloudStorage:
def __init__(self, audit_info):
self.service = "storage"
self.api_version = "v1"
self.project_id = audit_info.project_id
self.project_ids = audit_info.project_ids
self.client = generate_client(self.service, self.api_version, audit_info)
self.buckets = []
self.__get_buckets__()
def __get_buckets__(self):
try:
request = self.client.buckets().list(project=self.project_id)
while request is not None:
response = request.execute()
for bucket in response.get("items", []):
bucket_iam = (
self.client.buckets()
.getIamPolicy(bucket=bucket["id"])
.execute()["bindings"]
)
public = False
if "allAuthenticatedUsers" in str(bucket_iam) or "allUsers" in str(
bucket_iam
):
public = True
self.buckets.append(
Bucket(
name=bucket["name"],
id=bucket["id"],
region=bucket["location"],
uniform_bucket_level_access=bucket["iamConfiguration"][
"uniformBucketLevelAccess"
]["enabled"],
public=public,
for project_id in self.project_ids:
try:
request = self.client.buckets().list(project=project_id)
while request is not None:
response = request.execute()
for bucket in response.get("items", []):
bucket_iam = (
self.client.buckets()
.getIamPolicy(bucket=bucket["id"])
.execute()["bindings"]
)
public = False
if "allAuthenticatedUsers" in str(
bucket_iam
) or "allUsers" in str(bucket_iam):
public = True
self.buckets.append(
Bucket(
name=bucket["name"],
id=bucket["id"],
region=bucket["location"],
uniform_bucket_level_access=bucket["iamConfiguration"][
"uniformBucketLevelAccess"
]["enabled"],
public=public,
project_id=project_id,
)
)
)
request = self.client.buckets().list_next(
previous_request=request, previous_response=response
request = self.client.buckets().list_next(
previous_request=request, previous_response=response
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Bucket(BaseModel):
@@ -57,3 +59,4 @@ class Bucket(BaseModel):
region: str
uniform_bucket_level_access: bool
public: bool
project_id: str

View File

@@ -7,7 +7,7 @@ class compute_default_service_account_in_use(Check):
findings = []
for instance in compute_client.instances:
report = Check_Report_GCP(self.metadata())
report.project_id = compute_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.id
report.resource_name = instance.name
report.location = instance.zone
@@ -16,10 +16,7 @@ class compute_default_service_account_in_use(Check):
if (
any(
[
(
sa["email"]
== f"{compute_client.project_id}-compute@developer.gserviceaccount.com"
)
("-compute@developer.gserviceaccount.com" in sa["email"])
for sa in instance.service_accounts
]
)

View File

@@ -7,7 +7,7 @@ class compute_default_service_account_in_use_with_full_api_access(Check):
findings = []
for instance in compute_client.instances:
report = Check_Report_GCP(self.metadata())
report.project_id = compute_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.id
report.resource_name = instance.name
report.location = instance.zone
@@ -15,8 +15,7 @@ class compute_default_service_account_in_use_with_full_api_access(Check):
report.status_extended = f"The VM Instance {instance.name} is not configured to use the default service account with full access to all cloud APIs "
for service_account in instance.service_accounts:
if (
service_account["email"]
== f"{compute_client.project_id}-compute@developer.gserviceaccount.com"
"-compute@developer.gserviceaccount.com" in service_account["email"]
and "https://www.googleapis.com/auth/cloud-platform"
in service_account["scopes"]
and instance.name[:4] != "gke-"

View File

@@ -7,7 +7,7 @@ class compute_instance_public_ip(Check):
findings = []
for instance in compute_client.instances:
report = Check_Report_GCP(self.metadata())
report.project_id = compute_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.id
report.resource_name = instance.name
report.location = instance.zone

View File

@@ -5,18 +5,32 @@ from prowler.providers.gcp.services.compute.compute_client import compute_client
class compute_network_default_in_use(Check):
def execute(self) -> Check_Report_GCP:
findings = []
report = Check_Report_GCP(self.metadata())
report.project_id = compute_client.project_id
report.resource_id = "default"
report.resource_name = "default"
report.location = "global"
report.status = "PASS"
report.status_extended = "Default network does not exist"
# Check if default network is in use for each project
projects_with_default_network = set()
for network in compute_client.networks:
if network.name == "default":
projects_with_default_network.add(network.project_id)
report = Check_Report_GCP(self.metadata())
report.project_id = network.project_id
report.resource_id = "default"
report.resource_name = "default"
report.location = "global"
report.status = "FAIL"
report.status_extended = "Default network is in use"
report.status_extended = (
f"Default network is in use in project {network.project_id}"
)
findings.append(report)
findings.append(report)
for project in compute_client.project_ids:
if project not in projects_with_default_network:
report = Check_Report_GCP(self.metadata())
report.project_id = project
report.resource_id = "default"
report.resource_name = "default"
report.location = "global"
report.status = "PASS"
report.status_extended = (
f"Default network does not exist in project {project}"
)
return findings

View File

@@ -7,7 +7,7 @@ class compute_serial_ports_in_use(Check):
findings = []
for instance in compute_client.instances:
report = Check_Report_GCP(self.metadata())
report.project_id = compute_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.id
report.resource_name = instance.name
report.location = instance.zone

View File

@@ -9,9 +9,10 @@ class Compute:
def __init__(self, audit_info):
self.service = "compute"
self.api_version = "v1"
self.project_id = audit_info.project_id
self.project_ids = audit_info.project_ids
self.default_project_id = audit_info.default_project_id
self.client = generate_client(self.service, self.api_version, audit_info)
self.zones = []
self.zones = set()
self.instances = []
self.networks = []
self.__get_zones__()
@@ -19,83 +20,88 @@ class Compute:
self.__get_networks__()
def __get_zones__(self):
try:
request = self.client.zones().list(project=self.project_id)
while request is not None:
response = request.execute()
for zone in response.get("items", []):
self.zones.append(zone["name"])
request = self.client.zones().list_next(
previous_request=request, previous_response=response
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_instances__(self):
try:
for zone in self.zones:
request = self.client.instances().list(
project=self.project_id, zone=zone
)
for project_id in self.project_ids:
try:
request = self.client.zones().list(project=project_id)
while request is not None:
response = request.execute()
for instance in response.get("items", []):
public_ip = False
for interface in instance["networkInterfaces"]:
for config in interface.get("accessConfigs", []):
if "natIP" in config:
public_ip = True
self.instances.append(
Instance(
name=instance["name"],
id=instance["id"],
zone=zone,
public_ip=public_ip,
metadata=instance["metadata"],
shielded_enabled_vtpm=instance[
"shieldedInstanceConfig"
]["enableVtpm"],
shielded_enabled_integrity_monitoring=instance[
"shieldedInstanceConfig"
]["enableIntegrityMonitoring"],
service_accounts=instance["serviceAccounts"],
for zone in response.get("items", []):
self.zones.add(zone["name"])
request = self.client.zones().list_next(
previous_request=request, previous_response=response
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_instances__(self):
for project_id in self.project_ids:
try:
for zone in self.zones:
request = self.client.instances().list(
project=project_id, zone=zone
)
while request is not None:
response = request.execute()
for instance in response.get("items", []):
public_ip = False
for interface in instance["networkInterfaces"]:
for config in interface.get("accessConfigs", []):
if "natIP" in config:
public_ip = True
self.instances.append(
Instance(
name=instance["name"],
id=instance["id"],
zone=zone,
public_ip=public_ip,
metadata=instance["metadata"],
shielded_enabled_vtpm=instance[
"shieldedInstanceConfig"
]["enableVtpm"],
shielded_enabled_integrity_monitoring=instance[
"shieldedInstanceConfig"
]["enableIntegrityMonitoring"],
service_accounts=instance["serviceAccounts"],
project_id=project_id,
)
)
request = self.client.instances().list_next(
previous_request=request, previous_response=response
)
except Exception as error:
logger.error(
f"{zone} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_networks__(self):
for project_id in self.project_ids:
try:
request = self.client.networks().list(project=project_id)
while request is not None:
response = request.execute()
for network in response.get("items", []):
self.networks.append(
Network(
name=network["name"],
id=network["id"],
project_id=project_id,
)
)
request = self.client.instances().list_next(
request = self.client.networks().list_next(
previous_request=request, previous_response=response
)
except Exception as error:
logger.error(
f"{zone} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_networks__(self):
try:
request = self.client.networks().list(project=self.project_id)
while request is not None:
response = request.execute()
for network in response.get("items", []):
self.networks.append(
Network(
name=network["name"],
id=network["id"],
)
)
request = self.client.networks().list_next(
previous_request=request, previous_response=response
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Instance(BaseModel):
@@ -103,6 +109,7 @@ class Instance(BaseModel):
id: str
zone: str
public_ip: bool
project_id: str
metadata: dict
shielded_enabled_vtpm: bool
shielded_enabled_integrity_monitoring: bool
@@ -112,3 +119,4 @@ class Instance(BaseModel):
class Network(BaseModel):
name: str
id: str
project_id: str

View File

@@ -7,7 +7,7 @@ class compute_shielded_vm_enabled(Check):
findings = []
for instance in compute_client.instances:
report = Check_Report_GCP(self.metadata())
report.project_id = compute_client.project_id
report.project_id = instance.project_id
report.resource_id = instance.id
report.resource_name = instance.name
report.location = instance.zone

View File

@@ -10,7 +10,7 @@ class iam_sa_no_administrative_privileges(Check):
findings = []
for account in iam_client.service_accounts:
report = Check_Report_GCP(self.metadata())
report.project_id = iam_client.project_id
report.project_id = account.project_id
report.resource_id = account.email
report.resource_name = account.name
report.location = iam_client.region

View File

@@ -7,7 +7,7 @@ class iam_sa_no_user_managed_keys(Check):
findings = []
for account in iam_client.service_accounts:
report = Check_Report_GCP(self.metadata())
report.project_id = iam_client.project_id
report.project_id = account.project_id
report.resource_id = account.email
report.resource_name = account.name
report.location = iam_client.region

View File

@@ -12,7 +12,7 @@ class iam_sa_user_managed_key_rotate_90_days(Check):
if key.type == "USER_MANAGED":
last_rotated = (datetime.now() - key.valid_after).days
report = Check_Report_GCP(self.metadata())
report.project_id = iam_client.project_id
report.project_id = account.project_id
report.resource_id = key.name
report.resource_name = account.email
report.location = iam_client.region

View File

@@ -11,7 +11,7 @@ class IAM:
def __init__(self, audit_info):
self.service = "iam"
self.api_version = "v1"
self.project_id = audit_info.project_id
self.project_ids = audit_info.project_ids
self.region = "global"
self.client = generate_client(self.service, self.api_version, audit_info)
self.service_accounts = []
@@ -22,33 +22,35 @@ class IAM:
return self.client
def __get_service_accounts__(self):
try:
request = (
self.client.projects()
.serviceAccounts()
.list(name="projects/" + self.project_id)
)
while request is not None:
response = request.execute()
for account in response["accounts"]:
self.service_accounts.append(
ServiceAccount(
name=account["name"],
email=account["email"],
display_name=account.get("displayName", ""),
)
)
for project_id in self.project_ids:
try:
request = (
self.client.projects()
.serviceAccounts()
.list_next(previous_request=request, previous_response=response)
.list(name="projects/" + project_id)
)
while request is not None:
response = request.execute()
for account in response["accounts"]:
self.service_accounts.append(
ServiceAccount(
name=account["name"],
email=account["email"],
display_name=account.get("displayName", ""),
project_id=project_id,
)
)
request = (
self.client.projects()
.serviceAccounts()
.list_next(previous_request=request, previous_response=response)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_service_accounts_keys__(self):
try:
@@ -59,7 +61,7 @@ class IAM:
.keys()
.list(
name="projects/"
+ self.project_id
+ sa.project_id
+ "/serviceAccounts/"
+ sa.email
)
@@ -100,3 +102,4 @@ class ServiceAccount(BaseModel):
email: str
display_name: str
keys: list[Key] = []
project_id: str

View File

@@ -7,7 +7,7 @@ class kms_key_not_publicly_accessible(Check):
findings = []
for key in kms_client.crypto_keys:
report = Check_Report_GCP(self.metadata())
report.project_id = kms_client.project_id
report.project_id = key.project_id
report.resource_id = key.name
report.resource_name = key.name
report.location = key.location

View File

@@ -7,7 +7,7 @@ class kms_key_rotation_enabled(Check):
findings = []
for key in kms_client.crypto_keys:
report = Check_Report_GCP(self.metadata())
report.project_id = kms_client.project_id
report.project_id = key.project_id
report.resource_id = key.name
report.resource_name = key.name
report.location = key.location

View File

@@ -11,7 +11,7 @@ class KMS:
def __init__(self, audit_info):
self.service = "cloudkms"
self.api_version = "v1"
self.project_id = audit_info.project_id
self.project_ids = audit_info.project_ids
self.region = "global"
self.client = generate_client(self.service, self.api_version, audit_info)
self.locations = []
@@ -26,33 +26,39 @@ class KMS:
return self.client
def __get_locations__(self):
try:
request = (
self.client.projects()
.locations()
.list(name="projects/" + self.project_id)
)
while request is not None:
response = request.execute()
for location in response["locations"]:
self.locations.append(location["name"])
for project_id in self.project_ids:
try:
request = (
self.client.projects()
.locations()
.list_next(previous_request=request, previous_response=response)
.list(name="projects/" + project_id)
)
while request is not None:
response = request.execute()
for location in response["locations"]:
self.locations.append(
KeyLocation(name=location["name"], project_id=project_id)
)
request = (
self.client.projects()
.locations()
.list_next(previous_request=request, previous_response=response)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_key_rings__(self):
try:
for location in self.locations:
for location in self.locations:
try:
request = (
self.client.projects().locations().keyRings().list(parent=location)
self.client.projects()
.locations()
.keyRings()
.list(parent=location.name)
)
while request is not None:
response = request.execute()
@@ -61,6 +67,7 @@ class KMS:
self.key_rings.append(
KeyRing(
name=ring["name"],
project_id=location.project_id,
)
)
@@ -70,14 +77,14 @@ class KMS:
.keyRings()
.list_next(previous_request=request, previous_response=response)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_crypto_keys__(self):
try:
for ring in self.key_rings:
for ring in self.key_rings:
try:
request = (
self.client.projects()
.locations()
@@ -95,6 +102,7 @@ class KMS:
location=key["name"].split("/")[3],
rotation_period=key.get("rotationPeriod"),
key_ring=ring.name,
project_id=ring.project_id,
)
)
@@ -105,14 +113,14 @@ class KMS:
.cryptoKeys()
.list_next(previous_request=request, previous_response=response)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_crypto_keys_iam_policy__(self):
try:
for key in self.crypto_keys:
for key in self.crypto_keys:
try:
request = (
self.client.projects()
.locations()
@@ -124,14 +132,20 @@ class KMS:
for binding in response.get("bindings", []):
key.members.extend(binding.get("members", []))
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class KeyLocation(BaseModel):
name: str
project_id: str
class KeyRing(BaseModel):
name: str
project_id: str
class CriptoKey(BaseModel):
@@ -140,3 +154,4 @@ class CriptoKey(BaseModel):
rotation_period: Optional[str]
key_ring: str
members: list = []
project_id: str

View File

@@ -10,32 +10,37 @@ class logging_log_metric_filter_and_alert_for_audit_configuration_changes_enable
):
def execute(self) -> Check_Report_GCP:
findings = []
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = logging_client.project_id
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = "There are no log metric filters or alerts associated."
if logging_client.metrics:
for metric in logging_client.metrics:
if (
'protoPayload.methodName="SetIamPolicy" AND protoPayload.serviceData.policyDelta.auditConfigDeltas:*'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated."
break
findings.append(report)
projects_with_metric = set()
for metric in logging_client.metrics:
if (
'protoPayload.methodName="SetIamPolicy" AND protoPayload.serviceData.policyDelta.auditConfigDeltas:*'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
projects_with_metric.add(metric.project_id)
report.project_id = metric.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated in project {metric.project_id}."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
break
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_metric:
report = Check_Report_GCP(self.metadata())
report.project_id = project
report.resource_id = project
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"There are no log metric filters or alerts associated in project {project}."
findings.append(report)
return findings

View File

@@ -8,32 +8,37 @@ from prowler.providers.gcp.services.monitoring.monitoring_client import (
class logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled(Check):
def execute(self) -> Check_Report_GCP:
findings = []
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = logging_client.project_id
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = "There are no log metric filters or alerts associated."
if logging_client.metrics:
for metric in logging_client.metrics:
if (
'resource.type="gcs_bucket" AND protoPayload.methodName="storage.setIamPermissions"'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated."
break
findings.append(report)
projects_with_metric = set()
for metric in logging_client.metrics:
if (
'resource.type="gcs_bucket" AND protoPayload.methodName="storage.setIamPermissions"'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
projects_with_metric.add(metric.project_id)
report.project_id = metric.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated in project {metric.project_id}."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
break
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_metric:
report = Check_Report_GCP(self.metadata())
report.project_id = project
report.resource_id = project
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"There are no log metric filters or alerts associated in project {project}."
findings.append(report)
return findings

View File

@@ -8,32 +8,37 @@ from prowler.providers.gcp.services.monitoring.monitoring_client import (
class logging_log_metric_filter_and_alert_for_custom_role_changes_enabled(Check):
def execute(self) -> Check_Report_GCP:
findings = []
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = logging_client.project_id
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = "There are no log metric filters or alerts associated."
if logging_client.metrics:
for metric in logging_client.metrics:
if (
'resource.type="iam_role" AND (protoPayload.methodName="google.iam.admin.v1.CreateRole" OR protoPayload.methodName="google.iam.admin.v1.DeleteRole" OR protoPayload.methodName="google.iam.admin.v1.UpdateRole")'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated."
break
findings.append(report)
projects_with_metric = set()
for metric in logging_client.metrics:
if (
'resource.type="iam_role" AND (protoPayload.methodName="google.iam.admin.v1.CreateRole" OR protoPayload.methodName="google.iam.admin.v1.DeleteRole" OR protoPayload.methodName="google.iam.admin.v1.UpdateRole")'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
projects_with_metric.add(metric.project_id)
report.project_id = metric.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated in project {metric.project_id}."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
break
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_metric:
report = Check_Report_GCP(self.metadata())
report.project_id = project
report.resource_id = project
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"There are no log metric filters or alerts associated in project {project}."
findings.append(report)
return findings

View File

@@ -8,32 +8,37 @@ from prowler.providers.gcp.services.monitoring.monitoring_client import (
class logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled(Check):
def execute(self) -> Check_Report_GCP:
findings = []
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = logging_client.project_id
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = "There are no log metric filters or alerts associated."
if logging_client.metrics:
for metric in logging_client.metrics:
if (
'(protoPayload.serviceName="cloudresourcemanager.googleapis.com") AND (ProjectOwnership OR projectOwnerInvitee) OR (protoPayload.serviceData.policyDelta.bindingDeltas.action="REMOVE" AND protoPayload.serviceData.policyDelta.bindingDeltas.role="roles/owner") OR (protoPayload.serviceData.policyDelta.bindingDeltas.action="ADD" AND protoPayload.serviceData.policyDelta.bindingDeltas.role="roles/owner")'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated."
break
findings.append(report)
projects_with_metric = set()
for metric in logging_client.metrics:
if (
'(protoPayload.serviceName="cloudresourcemanager.googleapis.com") AND (ProjectOwnership OR projectOwnerInvitee) OR (protoPayload.serviceData.policyDelta.bindingDeltas.action="REMOVE" AND protoPayload.serviceData.policyDelta.bindingDeltas.role="roles/owner") OR (protoPayload.serviceData.policyDelta.bindingDeltas.action="ADD" AND protoPayload.serviceData.policyDelta.bindingDeltas.role="roles/owner")'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
projects_with_metric.add(metric.project_id)
report.project_id = metric.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated in project {metric.project_id}."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
break
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_metric:
report = Check_Report_GCP(self.metadata())
report.project_id = project
report.resource_id = project
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"There are no log metric filters or alerts associated in project {project}."
findings.append(report)
return findings

View File

@@ -10,32 +10,34 @@ class logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes
):
def execute(self) -> Check_Report_GCP:
findings = []
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = logging_client.project_id
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = "There are no log metric filters or alerts associated."
if logging_client.metrics:
for metric in logging_client.metrics:
if (
'protoPayload.methodName="cloudsql.instances.update"'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated."
break
findings.append(report)
projects_with_metric = set()
for metric in logging_client.metrics:
if 'protoPayload.methodName="cloudsql.instances.update"' in metric.filter:
report = Check_Report_GCP(self.metadata())
projects_with_metric.add(metric.project_id)
report.project_id = metric.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated in project {metric.project_id}."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
break
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_metric:
report = Check_Report_GCP(self.metadata())
report.project_id = project
report.resource_id = project
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"There are no log metric filters or alerts associated in project {project}."
findings.append(report)
return findings

View File

@@ -8,32 +8,37 @@ from prowler.providers.gcp.services.monitoring.monitoring_client import (
class logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled(Check):
def execute(self) -> Check_Report_GCP:
findings = []
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = logging_client.project_id
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = "There are no log metric filters or alerts associated."
if logging_client.metrics:
for metric in logging_client.metrics:
if (
'resource.type="gce_firewall_rule" AND (protoPayload.methodName:"compute.firewalls.patch" OR protoPayload.methodName:"compute.firewalls.insert" OR protoPayload.methodName:"compute.firewalls.delete")'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated."
break
findings.append(report)
projects_with_metric = set()
for metric in logging_client.metrics:
if (
'resource.type="gce_firewall_rule" AND (protoPayload.methodName:"compute.firewalls.patch" OR protoPayload.methodName:"compute.firewalls.insert" OR protoPayload.methodName:"compute.firewalls.delete")'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
projects_with_metric.add(metric.project_id)
report.project_id = metric.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated in project {metric.project_id}."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
break
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_metric:
report = Check_Report_GCP(self.metadata())
report.project_id = project
report.resource_id = project
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"There are no log metric filters or alerts associated in project {project}."
findings.append(report)
return findings

View File

@@ -8,32 +8,37 @@ from prowler.providers.gcp.services.monitoring.monitoring_client import (
class logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled(Check):
def execute(self) -> Check_Report_GCP:
findings = []
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = logging_client.project_id
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = "There are no log metric filters or alerts associated."
if logging_client.metrics:
for metric in logging_client.metrics:
if (
'resource.type="gce_network" AND (protoPayload.methodName:"compute.networks.insert" OR protoPayload.methodName:"compute.networks.patch" OR protoPayload.methodName:"compute.networks.delete" OR protoPayload.methodName:"compute.networks.removePeering" OR protoPayload.methodName:"compute.networks.addPeering")'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated."
break
findings.append(report)
projects_with_metric = set()
for metric in logging_client.metrics:
if (
'resource.type="gce_network" AND (protoPayload.methodName:"compute.networks.insert" OR protoPayload.methodName:"compute.networks.patch" OR protoPayload.methodName:"compute.networks.delete" OR protoPayload.methodName:"compute.networks.removePeering" OR protoPayload.methodName:"compute.networks.addPeering")'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
projects_with_metric.add(metric.project_id)
report.project_id = metric.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated in project {metric.project_id}."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
break
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_metric:
report = Check_Report_GCP(self.metadata())
report.project_id = project
report.resource_id = project
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"There are no log metric filters or alerts associated in project {project}."
findings.append(report)
return findings

View File

@@ -8,32 +8,37 @@ from prowler.providers.gcp.services.monitoring.monitoring_client import (
class logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled(Check):
def execute(self) -> Check_Report_GCP:
findings = []
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = logging_client.project_id
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = "There are no log metric filters or alerts associated."
if logging_client.metrics:
for metric in logging_client.metrics:
if (
'resource.type="gce_route" AND (protoPayload.methodName:"compute.routes.delete" OR protoPayload.methodName:"compute.routes.insert")'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated."
break
findings.append(report)
projects_with_metric = set()
for metric in logging_client.metrics:
if (
'resource.type="gce_route" AND (protoPayload.methodName:"compute.routes.delete" OR protoPayload.methodName:"compute.routes.insert")'
in metric.filter
):
report = Check_Report_GCP(self.metadata())
projects_with_metric.add(metric.project_id)
report.project_id = metric.project_id
report.resource_id = metric.name
report.resource_name = metric.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated in project {metric.project_id}."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
break
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_metric:
report = Check_Report_GCP(self.metadata())
report.project_id = project
report.resource_id = project
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"There are no log metric filters or alerts associated in project {project}."
findings.append(report)
return findings

View File

@@ -10,7 +10,8 @@ class Logging:
self.service = "logging"
self.api_version = "v2"
self.region = "global"
self.project_id = audit_info.project_id
self.project_ids = audit_info.project_ids
self.default_project_id = audit_info.default_project_id
self.client = generate_client(self.service, self.api_version, audit_info)
self.sinks = []
self.metrics = []
@@ -18,65 +19,71 @@ class Logging:
self.__get_metrics__()
def __get_sinks__(self):
try:
request = self.client.sinks().list(parent=f"projects/{self.project_id}")
while request is not None:
response = request.execute()
for project_id in self.project_ids:
try:
request = self.client.sinks().list(parent=f"projects/{project_id}")
while request is not None:
response = request.execute()
for sink in response.get("sinks", []):
self.sinks.append(
Sink(
name=sink["name"],
destination=sink["destination"],
filter=sink.get("filter", "all"),
for sink in response.get("sinks", []):
self.sinks.append(
Sink(
name=sink["name"],
destination=sink["destination"],
filter=sink.get("filter", "all"),
project_id=project_id,
)
)
)
request = self.client.sinks().list_next(
previous_request=request, previous_response=response
request = self.client.sinks().list_next(
previous_request=request, previous_response=response
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_metrics__(self):
try:
request = (
self.client.projects()
.metrics()
.list(parent=f"projects/{self.project_id}")
)
while request is not None:
response = request.execute()
for metric in response.get("metrics", []):
self.metrics.append(
Metric(
name=metric["name"],
type=metric["metricDescriptor"]["type"],
filter=metric["filter"],
)
)
for project_id in self.project_ids:
try:
request = (
self.client.projects()
.metrics()
.list_next(previous_request=request, previous_response=response)
.list(parent=f"projects/{project_id}")
)
while request is not None:
response = request.execute()
for metric in response.get("metrics", []):
self.metrics.append(
Metric(
name=metric["name"],
type=metric["metricDescriptor"]["type"],
filter=metric["filter"],
project_id=project_id,
)
)
request = (
self.client.projects()
.metrics()
.list_next(previous_request=request, previous_response=response)
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Sink(BaseModel):
name: str
destination: str
filter: str
project_id: str
class Metric(BaseModel):
name: str
type: str
filter: str
project_id: str

View File

@@ -5,28 +5,30 @@ from prowler.providers.gcp.services.logging.logging_client import logging_client
class logging_sink_created(Check):
def execute(self) -> Check_Report_GCP:
findings = []
if not logging_client.sinks:
projects_with_sink = set()
for sink in logging_client.sinks:
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = logging_client.project_id
report.resource_name = ""
projects_with_sink.add(sink.project_id)
report.project_id = sink.project_id
report.resource_id = sink.name
report.resource_name = sink.name
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = (
"There are no logging sinks to export copies of all the log entries"
)
else:
for sink in logging_client.sinks:
report.status_extended = f"Sink {sink.name} is enabled but not exporting copies of all the log entries in project {sink.project_id}"
if sink.filter == "all":
report.status = "PASS"
report.status_extended = f"Sink {sink.name} is enabled exporting copies of all the log entries in project {sink.project_id}"
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_sink:
report = Check_Report_GCP(self.metadata())
report.project_id = logging_client.project_id
report.resource_id = sink.name
report.resource_name = sink.name
report.project_id = project
report.resource_id = project
report.resource_name = ""
report.location = logging_client.region
report.status = "FAIL"
report.status_extended = f"Sink {sink.name} is enabled but not exporting copies of all the log entries"
if sink.filter == "all":
report.status = "PASS"
report.status_extended = f"Sink {sink.name} is enabled exporting copies of all the log entries"
report.status_extended = f"There are no logging sinks to export copies of all the log entries in project {project}"
findings.append(report)
return findings

View File

@@ -10,43 +10,45 @@ class Monitoring:
self.service = "monitoring"
self.api_version = "v3"
self.region = "global"
self.project_id = audit_info.project_id
self.project_ids = audit_info.project_ids
self.client = generate_client(self.service, self.api_version, audit_info)
self.alert_policies = []
self.__get_alert_policies__()
def __get_alert_policies__(self):
try:
request = (
self.client.projects()
.alertPolicies()
.list(name=f"projects/{self.project_id}")
)
while request is not None:
response = request.execute()
for policy in response.get("alertPolicies", []):
filters = []
for condition in policy["conditions"]:
filters.append(condition["conditionThreshold"]["filter"])
self.alert_policies.append(
AlertPolicy(
name=policy["name"],
display_name=policy["displayName"],
enabled=policy["enabled"],
filters=filters,
)
)
for project_id in self.project_ids:
try:
request = (
self.client.projects()
.alertPolicies()
.list_next(previous_request=request, previous_response=response)
.list(name=f"projects/{project_id}")
)
while request is not None:
response = request.execute()
for policy in response.get("alertPolicies", []):
filters = []
for condition in policy["conditions"]:
filters.append(condition["conditionThreshold"]["filter"])
self.alert_policies.append(
AlertPolicy(
name=policy["name"],
display_name=policy["displayName"],
enabled=policy["enabled"],
filters=filters,
project_id=project_id,
)
)
request = (
self.client.projects()
.alertPolicies()
.list_next(previous_request=request, previous_response=response)
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class AlertPolicy(BaseModel):
@@ -54,3 +56,4 @@ class AlertPolicy(BaseModel):
display_name: str
filters: list[str]
enabled: bool
project_id: str

View File

@@ -980,3 +980,14 @@ class Test_Parser:
parsed = self.parser.parse(command)
assert parsed.provider == "gcp"
assert parsed.credentials_file == file
def test_parser_gcp_project_ids(self):
argument = "--project-ids"
project_1 = "test_project_1"
project_2 = "test_project_2"
command = [prowler_command, "gcp", argument, project_1, project_2]
parsed = self.parser.parse(command)
assert parsed.provider == "gcp"
assert len(parsed.project_ids) == 2
assert parsed.project_ids[0] == project_1
assert parsed.project_ids[1] == project_2

View File

@@ -45,7 +45,8 @@ class Test_Slack_Integration:
)
gcp_audit_info = GCP_Audit_Info(
credentials=None,
project_id="test-project",
default_project_id="test-project1",
project_ids=["test-project1", "test-project2"],
audit_resources=None,
audit_metadata=None,
)
@@ -69,7 +70,7 @@ class Test_Slack_Integration:
aws_logo,
)
assert create_message_identity("gcp", gcp_audit_info) == (
f"GCP Project *{gcp_audit_info.project_id}*",
f"GCP Projects *{', '.join(gcp_audit_info.project_ids)}*",
gcp_logo,
)
assert create_message_identity("azure", azure_audit_info) == (

View File

@@ -83,6 +83,10 @@ def mock_set_gcp_credentials(*_):
return (None, "project")
def mock_get_project_ids(*_):
return ["project"]
class Test_Set_Audit_Info:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -166,6 +170,7 @@ class Test_Set_Audit_Info:
assert isinstance(audit_info, Azure_Audit_Info)
@patch.object(GCP_Provider, "__set_credentials__", new=mock_set_gcp_credentials)
@patch.object(GCP_Provider, "get_project_ids", new=mock_get_project_ids)
@patch.object(Audit_Info, "print_gcp_credentials", new=mock_print_audit_credentials)
def test_set_audit_info_gcp(self):
provider = "gcp"
@@ -179,6 +184,7 @@ class Test_Set_Audit_Info:
"subscriptions": None,
# We need to set exactly one auth method
"credentials_file": None,
"project_ids": ["project"],
}
audit_info = set_provider_audit_info(provider, arguments)

View File

@@ -45,7 +45,8 @@ class Test_Common_Output_Options:
def set_mocked_gcp_audit_info(self):
audit_info = GCP_Audit_Info(
credentials=None,
project_id="test-project",
default_project_id="test-project1",
project_ids=["test-project1", "test-project2"],
audit_resources=None,
audit_metadata=None,
)
@@ -347,7 +348,7 @@ class Test_Common_Output_Options:
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>GCP Project ID:</b> {audit_info.project_id}
<b>GCP Project IDs:</b> {', '.join(audit_info.project_ids)}
</li>
</ul>
</div>

View File

@@ -32,11 +32,12 @@ class Test_compute_default_service_account_in_use:
metadata={},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[{"email": "123-compute@developer.gserviceaccount.com"}],
service_accounts=[{"email": "custom@developer.gserviceaccount.com"}],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
@@ -72,10 +73,11 @@ class Test_compute_default_service_account_in_use:
service_accounts=[
{"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com"}
],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
@@ -111,10 +113,11 @@ class Test_compute_default_service_account_in_use:
service_accounts=[
{"email": f"{GCP_PROJECT_ID}-compute@developer.gserviceaccount.com"}
],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(

View File

@@ -35,10 +35,11 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
service_accounts=[
{"email": "123-compute@developer.gserviceaccount.com", "scopes": []}
],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
@@ -77,10 +78,11 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
"scopes": ["https://www.googleapis.com/auth/cloud-platform"],
}
],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
@@ -119,10 +121,11 @@ class Test_compute_default_service_account_in_use_with_full_api_access:
"scopes": ["https://www.googleapis.com/auth/cloud-platform"],
}
],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(

View File

@@ -34,6 +34,7 @@ class Test_compute_serial_ports_in_use:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
@@ -71,6 +72,7 @@ class Test_compute_serial_ports_in_use:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
@@ -108,6 +110,7 @@ class Test_compute_serial_ports_in_use:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
@@ -145,6 +148,7 @@ class Test_compute_serial_ports_in_use:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
@@ -182,6 +186,7 @@ class Test_compute_serial_ports_in_use:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock

View File

@@ -7,7 +7,7 @@ GCP_PROJECT_ID = "123456789012"
class Test_compute_shielded_vm_enabled:
def test_compute_no_instances(self):
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = []
with mock.patch(
@@ -34,10 +34,11 @@ class Test_compute_shielded_vm_enabled:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
@@ -71,10 +72,11 @@ class Test_compute_shielded_vm_enabled:
shielded_enabled_vtpm=False,
shielded_enabled_integrity_monitoring=True,
service_accounts=[],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(
@@ -108,10 +110,11 @@ class Test_compute_shielded_vm_enabled:
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=False,
service_accounts=[],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock
compute_client.project_id = GCP_PROJECT_ID
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with mock.patch(