mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 06:45:08 +00:00
417 lines
14 KiB
Python
417 lines
14 KiB
Python
import boto3
|
|
import botocore
|
|
import pytest
|
|
from boto3 import session
|
|
from mock import patch
|
|
from moto import mock_ec2, mock_resourcegroupstaggingapi
|
|
|
|
from prowler.config.config import default_config_file_path
|
|
from prowler.providers.aws.lib.audit_info.models import AWS_Assume_Role, AWS_Audit_Info
|
|
from prowler.providers.azure.azure_provider import Azure_Provider
|
|
from prowler.providers.azure.lib.audit_info.models import (
|
|
Azure_Audit_Info,
|
|
Azure_Identity_Info,
|
|
Azure_Region_Config,
|
|
)
|
|
from prowler.providers.azure.lib.exception.exception import AzureException
|
|
from prowler.providers.common.audit_info import (
|
|
Audit_Info,
|
|
get_tagged_resources,
|
|
set_provider_audit_info,
|
|
)
|
|
from prowler.providers.common.models import Audit_Metadata
|
|
from prowler.providers.gcp.gcp_provider import GCP_Provider
|
|
from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info
|
|
|
|
EXAMPLE_AMI_ID = "ami-12c6146b"
|
|
AWS_ACCOUNT_NUMBER = "123456789012"
|
|
|
|
|
|
mock_azure_audit_info = Azure_Audit_Info(
|
|
credentials=None,
|
|
identity=Azure_Identity_Info(),
|
|
audit_metadata=None,
|
|
audit_resources=None,
|
|
audit_config=None,
|
|
azure_region_config=Azure_Region_Config(),
|
|
)
|
|
|
|
mock_set_audit_info = Audit_Info()
|
|
|
|
# Mocking GetCallerIdentity for China and GovCloud
|
|
make_api_call = botocore.client.BaseClient._make_api_call
|
|
|
|
|
|
def mock_get_caller_identity_china(self, operation_name, kwarg):
|
|
if operation_name == "GetCallerIdentity":
|
|
return {
|
|
"UserId": "XXXXXXXXXXXXXXXXXXXXX",
|
|
"Account": AWS_ACCOUNT_NUMBER,
|
|
"Arn": f"arn:aws-cn:iam::{AWS_ACCOUNT_NUMBER}:user/test-user",
|
|
}
|
|
|
|
return make_api_call(self, operation_name, kwarg)
|
|
|
|
|
|
def mock_get_caller_identity_gov_cloud(self, operation_name, kwarg):
|
|
if operation_name == "GetCallerIdentity":
|
|
return {
|
|
"UserId": "XXXXXXXXXXXXXXXXXXXXX",
|
|
"Account": AWS_ACCOUNT_NUMBER,
|
|
"Arn": f"arn:aws-us-gov:iam::{AWS_ACCOUNT_NUMBER}:user/test-user",
|
|
}
|
|
|
|
return make_api_call(self, operation_name, kwarg)
|
|
|
|
|
|
def mock_validate_credentials(*_):
|
|
caller_identity = {
|
|
"Arn": "arn:aws:iam::123456789012:user/test",
|
|
"Account": "123456789012",
|
|
"UserId": "test",
|
|
}
|
|
return caller_identity
|
|
|
|
|
|
def mock_print_audit_credentials(*_):
|
|
pass
|
|
|
|
|
|
def mock_set_identity_info(*_):
|
|
return Azure_Identity_Info()
|
|
|
|
|
|
def mock_set_azure_credentials(*_):
|
|
return {}
|
|
|
|
|
|
def mock_set_gcp_credentials(*_):
|
|
return (None, "project")
|
|
|
|
|
|
def mock_get_project_ids(*_):
|
|
return ["project"]
|
|
|
|
|
|
class Test_Set_Audit_Info:
|
|
# Mocked Audit Info
|
|
def set_mocked_audit_info(self):
|
|
audit_info = AWS_Audit_Info(
|
|
session_config=None,
|
|
original_session=None,
|
|
audit_session=session.Session(
|
|
profile_name=None,
|
|
botocore_session=None,
|
|
),
|
|
audited_account=AWS_ACCOUNT_NUMBER,
|
|
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root",
|
|
audited_user_id=None,
|
|
audited_partition="aws",
|
|
audited_identity_arn="arn:aws:iam::123456789012:user/test",
|
|
profile=None,
|
|
profile_region="eu-west-1",
|
|
credentials=None,
|
|
assumed_role_info=AWS_Assume_Role(
|
|
role_arn=None,
|
|
session_duration=None,
|
|
external_id=None,
|
|
mfa_enabled=None,
|
|
),
|
|
audited_regions=["eu-west-2", "eu-west-1"],
|
|
organizations_metadata=None,
|
|
audit_resources=None,
|
|
mfa_enabled=False,
|
|
audit_metadata=Audit_Metadata(
|
|
services_scanned=0,
|
|
expected_checks=[],
|
|
completed_checks=0,
|
|
audit_progress=0,
|
|
),
|
|
)
|
|
|
|
return audit_info
|
|
|
|
@patch(
|
|
"prowler.providers.common.audit_info.azure_audit_info",
|
|
new=mock_azure_audit_info,
|
|
)
|
|
@patch.object(Azure_Provider, "__get_credentials__", new=mock_set_azure_credentials)
|
|
@patch.object(Azure_Provider, "__get_identity_info__", new=mock_set_identity_info)
|
|
def test_set_audit_info_azure(self):
|
|
provider = "azure"
|
|
arguments = {
|
|
"profile": None,
|
|
"role": None,
|
|
"session_duration": None,
|
|
"external_id": None,
|
|
"regions": None,
|
|
"organizations_role": None,
|
|
"subscriptions": None,
|
|
# We need to set exactly one auth method
|
|
"az_cli_auth": True,
|
|
"sp_env_auth": None,
|
|
"browser_auth": None,
|
|
"managed_entity_auth": None,
|
|
"config_file": default_config_file_path,
|
|
"azure_region": "AzureCloud",
|
|
}
|
|
|
|
audit_info = set_provider_audit_info(provider, arguments)
|
|
assert isinstance(audit_info, Azure_Audit_Info)
|
|
|
|
@patch(
|
|
"prowler.providers.common.audit_info.azure_audit_info",
|
|
new=mock_azure_audit_info,
|
|
)
|
|
@patch.object(Azure_Provider, "__get_credentials__", new=mock_set_azure_credentials)
|
|
@patch.object(Azure_Provider, "__get_identity_info__", new=mock_set_identity_info)
|
|
def test_set_azure_audit_info_not_auth_methods(self):
|
|
arguments = {
|
|
"profile": None,
|
|
"role": None,
|
|
"session_duration": None,
|
|
"external_id": None,
|
|
"regions": None,
|
|
"organizations_role": None,
|
|
"subscriptions": None,
|
|
# We need to set exactly one auth method
|
|
"az_cli_auth": None,
|
|
"sp_env_auth": None,
|
|
"browser_auth": None,
|
|
"managed_entity_auth": None,
|
|
"config_file": default_config_file_path,
|
|
"azure_region": "AzureCloud",
|
|
}
|
|
|
|
with pytest.raises(AzureException) as exception:
|
|
_ = Audit_Info().set_azure_audit_info(arguments)
|
|
assert exception.type == AzureException
|
|
assert (
|
|
exception.value.args[0]
|
|
== "Azure provider requires at least one authentication method set: [--az-cli-auth | --sp-env-auth | --browser-auth | --managed-identity-auth]"
|
|
)
|
|
|
|
@patch(
|
|
"prowler.providers.common.audit_info.azure_audit_info",
|
|
new=mock_azure_audit_info,
|
|
)
|
|
@patch.object(Azure_Provider, "__get_credentials__", new=mock_set_azure_credentials)
|
|
@patch.object(Azure_Provider, "__get_identity_info__", new=mock_set_identity_info)
|
|
def test_set_azure_audit_info_browser_auth_but_not_tenant_id(self):
|
|
arguments = {
|
|
"profile": None,
|
|
"role": None,
|
|
"session_duration": None,
|
|
"external_id": None,
|
|
"regions": None,
|
|
"organizations_role": None,
|
|
"subscriptions": None,
|
|
# We need to set exactly one auth method
|
|
"az_cli_auth": None,
|
|
"sp_env_auth": None,
|
|
"browser_auth": True,
|
|
"managed_entity_auth": None,
|
|
"config_file": default_config_file_path,
|
|
"azure_region": "AzureCloud",
|
|
}
|
|
|
|
with pytest.raises(AzureException) as exception:
|
|
_ = Audit_Info().set_azure_audit_info(arguments)
|
|
assert exception.type == AzureException
|
|
assert (
|
|
exception.value.args[0]
|
|
== "Azure Tenant ID (--tenant-id) is required only for browser authentication mode"
|
|
)
|
|
|
|
@patch(
|
|
"prowler.providers.common.audit_info.azure_audit_info",
|
|
new=mock_azure_audit_info,
|
|
)
|
|
@patch.object(Azure_Provider, "__get_credentials__", new=mock_set_azure_credentials)
|
|
@patch.object(Azure_Provider, "__get_identity_info__", new=mock_set_identity_info)
|
|
def test_set_azure_audit_info_tenant_id_but_no_browser_auth(self):
|
|
arguments = {
|
|
"profile": None,
|
|
"role": None,
|
|
"session_duration": None,
|
|
"external_id": None,
|
|
"regions": None,
|
|
"organizations_role": None,
|
|
"subscriptions": None,
|
|
# We need to set exactly one auth method
|
|
"az_cli_auth": True,
|
|
"sp_env_auth": None,
|
|
"browser_auth": None,
|
|
"managed_entity_auth": None,
|
|
"config_file": default_config_file_path,
|
|
"azure_region": "AzureCloud",
|
|
"tenant_id": "test-tenant-id",
|
|
}
|
|
|
|
with pytest.raises(AzureException) as exception:
|
|
_ = Audit_Info().set_azure_audit_info(arguments)
|
|
assert exception.type == AzureException
|
|
assert (
|
|
exception.value.args[0]
|
|
== "Azure Tenant ID (--tenant-id) is required only for browser authentication mode"
|
|
)
|
|
|
|
@patch.object(GCP_Provider, "__set_credentials__", new=mock_set_gcp_credentials)
|
|
@patch.object(GCP_Provider, "get_project_ids", new=mock_get_project_ids)
|
|
@patch.object(Audit_Info, "print_gcp_credentials", new=mock_print_audit_credentials)
|
|
def test_set_audit_info_gcp(self):
|
|
provider = "gcp"
|
|
arguments = {
|
|
"profile": None,
|
|
"role": None,
|
|
"session_duration": None,
|
|
"external_id": None,
|
|
"regions": None,
|
|
"organizations_role": None,
|
|
"subscriptions": None,
|
|
# We need to set exactly one auth method
|
|
"credentials_file": None,
|
|
"project_ids": ["project"],
|
|
"config_file": default_config_file_path,
|
|
}
|
|
|
|
audit_info = set_provider_audit_info(provider, arguments)
|
|
assert isinstance(audit_info, GCP_Audit_Info)
|
|
|
|
@mock_resourcegroupstaggingapi
|
|
@mock_ec2
|
|
def test_get_tagged_resources(self):
|
|
with patch(
|
|
"prowler.providers.common.audit_info.current_audit_info",
|
|
new=self.set_mocked_audit_info(),
|
|
) as mock_audit_info:
|
|
client = boto3.client("ec2", region_name="eu-central-1")
|
|
instances = client.run_instances(
|
|
ImageId=EXAMPLE_AMI_ID,
|
|
MinCount=1,
|
|
MaxCount=1,
|
|
InstanceType="t2.micro",
|
|
TagSpecifications=[
|
|
{
|
|
"ResourceType": "instance",
|
|
"Tags": [
|
|
{"Key": "MY_TAG1", "Value": "MY_VALUE1"},
|
|
{"Key": "MY_TAG2", "Value": "MY_VALUE2"},
|
|
],
|
|
},
|
|
{
|
|
"ResourceType": "instance",
|
|
"Tags": [{"Key": "ami", "Value": "test"}],
|
|
},
|
|
],
|
|
)
|
|
instance_id = instances["Instances"][0]["InstanceId"]
|
|
image_id = client.create_image(Name="testami", InstanceId=instance_id)[
|
|
"ImageId"
|
|
]
|
|
client.create_tags(
|
|
Resources=[image_id], Tags=[{"Key": "ami", "Value": "test"}]
|
|
)
|
|
|
|
mock_audit_info.audited_regions = ["eu-central-1"]
|
|
mock_audit_info.audit_session = boto3.session.Session()
|
|
assert len(get_tagged_resources(["ami=test"], mock_audit_info)) == 2
|
|
assert image_id in str(get_tagged_resources(["ami=test"], mock_audit_info))
|
|
assert instance_id in str(
|
|
get_tagged_resources(["ami=test"], mock_audit_info)
|
|
)
|
|
assert (
|
|
len(get_tagged_resources(["MY_TAG1=MY_VALUE1"], mock_audit_info)) == 1
|
|
)
|
|
assert instance_id in str(
|
|
get_tagged_resources(["MY_TAG1=MY_VALUE1"], mock_audit_info)
|
|
)
|
|
|
|
@mock_ec2
|
|
@patch(
|
|
"prowler.providers.common.audit_info.validate_aws_credentials",
|
|
new=mock_validate_credentials,
|
|
)
|
|
@patch(
|
|
"prowler.providers.common.audit_info.print_aws_credentials",
|
|
new=mock_print_audit_credentials,
|
|
)
|
|
def test_set_audit_info_aws(self):
|
|
with patch(
|
|
"prowler.providers.common.audit_info.current_audit_info",
|
|
new=self.set_mocked_audit_info(),
|
|
):
|
|
provider = "aws"
|
|
arguments = {
|
|
"profile": None,
|
|
"role": None,
|
|
"session_duration": None,
|
|
"external_id": None,
|
|
"regions": None,
|
|
"organizations_role": None,
|
|
"config_file": default_config_file_path,
|
|
}
|
|
|
|
audit_info = set_provider_audit_info(provider, arguments)
|
|
assert isinstance(audit_info, AWS_Audit_Info)
|
|
|
|
def test_set_audit_info_aws_bad_session_duration(self):
|
|
with patch(
|
|
"prowler.providers.common.audit_info.current_audit_info",
|
|
new=self.set_mocked_audit_info(),
|
|
):
|
|
provider = "aws"
|
|
arguments = {
|
|
"profile": None,
|
|
"role": None,
|
|
"session_duration": 100,
|
|
"external_id": None,
|
|
"regions": None,
|
|
"organizations_role": None,
|
|
}
|
|
|
|
with pytest.raises(SystemExit) as exception:
|
|
_ = set_provider_audit_info(provider, arguments)
|
|
# assert exception == "Value for -T option must be between 900 and 43200"
|
|
assert isinstance(exception, pytest.ExceptionInfo)
|
|
|
|
def test_set_audit_info_aws_session_duration_without_role(self):
|
|
with patch(
|
|
"prowler.providers.common.audit_info.current_audit_info",
|
|
new=self.set_mocked_audit_info(),
|
|
):
|
|
provider = "aws"
|
|
arguments = {
|
|
"profile": None,
|
|
"role": None,
|
|
"session_duration": 1000,
|
|
"external_id": None,
|
|
"regions": None,
|
|
"organizations_role": None,
|
|
}
|
|
|
|
with pytest.raises(SystemExit) as exception:
|
|
_ = set_provider_audit_info(provider, arguments)
|
|
# assert exception == "To use -I/-T options -R option is needed"
|
|
assert isinstance(exception, pytest.ExceptionInfo)
|
|
|
|
def test_set_audit_info_external_id_without_role(self):
|
|
with patch(
|
|
"prowler.providers.common.audit_info.current_audit_info",
|
|
new=self.set_mocked_audit_info(),
|
|
):
|
|
provider = "aws"
|
|
arguments = {
|
|
"profile": None,
|
|
"role": None,
|
|
"session_duration": 3600,
|
|
"external_id": "test-external-id",
|
|
"regions": None,
|
|
"organizations_role": None,
|
|
}
|
|
|
|
with pytest.raises(SystemExit) as exception:
|
|
_ = set_provider_audit_info(provider, arguments)
|
|
# assert exception == "To use -I/-T options -R option is needed"
|
|
assert isinstance(exception, pytest.ExceptionInfo)
|