mirror of
https://github.com/ghndrx/prowler.git
synced 2026-02-10 14:55:00 +00:00
fix(vpc tests): mock current_audit_info (#2214)
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
import json
|
||||
from unittest import mock
|
||||
|
||||
from boto3 import client
|
||||
from boto3 import client, session
|
||||
from moto import mock_ec2
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
|
||||
AWS_REGION = "us-east-1"
|
||||
ACCOUNT_ID = "123456789012"
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
|
||||
|
||||
def mock_get_config_var(config_var):
|
||||
@@ -15,27 +17,52 @@ def mock_get_config_var(config_var):
|
||||
|
||||
|
||||
class Test_vpc_endpoint_connections_trust_boundaries:
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["us-east-1", "eu-west-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
)
|
||||
|
||||
return audit_info
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_no_endpoints(self):
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
|
||||
vpc_endpoint_connections_trust_boundaries,
|
||||
)
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
|
||||
vpc_endpoint_connections_trust_boundaries,
|
||||
)
|
||||
|
||||
check = vpc_endpoint_connections_trust_boundaries()
|
||||
result = check.execute()
|
||||
check = vpc_endpoint_connections_trust_boundaries()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 0
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_endpoint_with_full_access(self):
|
||||
@@ -63,32 +90,38 @@ class Test_vpc_endpoint_connections_trust_boundaries:
|
||||
}
|
||||
),
|
||||
)
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
|
||||
vpc_endpoint_connections_trust_boundaries,
|
||||
)
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
|
||||
vpc_endpoint_connections_trust_boundaries,
|
||||
)
|
||||
|
||||
check = vpc_endpoint_connections_trust_boundaries()
|
||||
result = check.execute()
|
||||
check = vpc_endpoint_connections_trust_boundaries()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} has full access."
|
||||
)
|
||||
assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
|
||||
assert result[0].region == AWS_REGION
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} has full access."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_id
|
||||
== vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
|
||||
)
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_endpoint_with_trusted_account(self):
|
||||
@@ -116,33 +149,37 @@ class Test_vpc_endpoint_connections_trust_boundaries:
|
||||
}
|
||||
),
|
||||
)
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info.audited_account = ACCOUNT_ID
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
|
||||
vpc_endpoint_connections_trust_boundaries,
|
||||
)
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
|
||||
vpc_endpoint_connections_trust_boundaries,
|
||||
)
|
||||
|
||||
check = vpc_endpoint_connections_trust_boundaries()
|
||||
result = check.execute()
|
||||
check = vpc_endpoint_connections_trust_boundaries()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Found trusted account {ACCOUNT_ID} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
|
||||
)
|
||||
assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
|
||||
assert result[0].region == AWS_REGION
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_id
|
||||
== vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
|
||||
)
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_endpoint_with_untrusted_account(self):
|
||||
@@ -170,32 +207,37 @@ class Test_vpc_endpoint_connections_trust_boundaries:
|
||||
}
|
||||
),
|
||||
)
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info.audited_account = ACCOUNT_ID
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
|
||||
vpc_endpoint_connections_trust_boundaries,
|
||||
)
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
|
||||
vpc_endpoint_connections_trust_boundaries,
|
||||
)
|
||||
|
||||
check = vpc_endpoint_connections_trust_boundaries()
|
||||
result = check.execute()
|
||||
check = vpc_endpoint_connections_trust_boundaries()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Found untrusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
|
||||
)
|
||||
assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Found untrusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_id
|
||||
== vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
|
||||
)
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_endpoint_with_config_trusted_account(self):
|
||||
@@ -223,37 +265,38 @@ class Test_vpc_endpoint_connections_trust_boundaries:
|
||||
}
|
||||
),
|
||||
)
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info.audited_account = ACCOUNT_ID
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.get_config_var",
|
||||
new=mock_get_config_var,
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
|
||||
vpc_endpoint_connections_trust_boundaries,
|
||||
)
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.get_config_var",
|
||||
new=mock_get_config_var,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
|
||||
vpc_endpoint_connections_trust_boundaries,
|
||||
)
|
||||
|
||||
check = vpc_endpoint_connections_trust_boundaries()
|
||||
result = check.execute()
|
||||
check = vpc_endpoint_connections_trust_boundaries()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Found trusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_id
|
||||
== vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
|
||||
)
|
||||
assert result[0].region == AWS_REGION
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Found trusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_id
|
||||
== vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
|
||||
)
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
from unittest import mock
|
||||
|
||||
import botocore
|
||||
from boto3 import client
|
||||
from boto3 import client, session
|
||||
from mock import patch
|
||||
from moto import mock_ec2, mock_elbv2
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
|
||||
AWS_REGION = "us-east-1"
|
||||
ACCOUNT_ID = "123456789012"
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
|
||||
# Mocking VPC Calls
|
||||
make_api_call = botocore.client.BaseClient._make_api_call
|
||||
@@ -24,7 +26,7 @@ def mock_make_api_call(self, operation_name, kwarg):
|
||||
{
|
||||
"ServiceId": "vpce-svc-4b919ac5",
|
||||
"ServiceName": "string",
|
||||
"Owner": ACCOUNT_ID,
|
||||
"Owner": AWS_ACCOUNT_NUMBER,
|
||||
"StageName": "test-stage",
|
||||
}
|
||||
]
|
||||
@@ -34,27 +36,52 @@ def mock_make_api_call(self, operation_name, kwarg):
|
||||
|
||||
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
class Test_vpc_endpoint_services_allowed_principals_trust_boundaries:
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["us-east-1", "eu-west-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
)
|
||||
|
||||
return audit_info
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_no_endpoint_services(self):
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
|
||||
vpc_endpoint_services_allowed_principals_trust_boundaries,
|
||||
)
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
|
||||
vpc_endpoint_services_allowed_principals_trust_boundaries,
|
||||
)
|
||||
|
||||
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
|
||||
result = check.execute()
|
||||
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 2 # one endpoint per region
|
||||
assert len(result) == 2 # one endpoint per region
|
||||
|
||||
@mock_ec2
|
||||
@mock_elbv2
|
||||
@@ -84,33 +111,35 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries:
|
||||
# NetworkLoadBalancerArns=[lb_arn]
|
||||
# )
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
|
||||
vpc_endpoint_services_allowed_principals_trust_boundaries,
|
||||
)
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
|
||||
vpc_endpoint_services_allowed_principals_trust_boundaries,
|
||||
)
|
||||
|
||||
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
|
||||
result = check.execute()
|
||||
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 2 # one per region
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Endpoint Service {ec2_client.describe_vpc_endpoint_services()['ServiceDetails'][0]['ServiceId']} has no allowed principals."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_id
|
||||
== ec2_client.describe_vpc_endpoint_services()["ServiceDetails"][0][
|
||||
"ServiceId"
|
||||
]
|
||||
)
|
||||
assert len(result) == 2 # one per region
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Endpoint Service {ec2_client.describe_vpc_endpoint_services()['ServiceDetails'][0]['ServiceId']} has no allowed principals."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_id
|
||||
== ec2_client.describe_vpc_endpoint_services()["ServiceDetails"][0][
|
||||
"ServiceId"
|
||||
]
|
||||
)
|
||||
|
||||
@@ -1,34 +1,62 @@
|
||||
from unittest import mock
|
||||
|
||||
from boto3 import client
|
||||
from boto3 import client, session
|
||||
from moto import mock_ec2
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
|
||||
AWS_REGION = "us-east-1"
|
||||
ACCOUNT_ID = "123456789012"
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
|
||||
|
||||
class Test_vpc_flow_logs_enabled:
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["us-east-1", "eu-west-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
)
|
||||
return audit_info
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_only_default_vpcs(self):
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import (
|
||||
vpc_flow_logs_enabled,
|
||||
)
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import (
|
||||
vpc_flow_logs_enabled,
|
||||
)
|
||||
|
||||
check = vpc_flow_logs_enabled()
|
||||
result = check.execute()
|
||||
check = vpc_flow_logs_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 2 # Number of AWS regions, one default VPC per region
|
||||
assert (
|
||||
len(result) == 2
|
||||
) # Number of AWS regions, one default VPC per region
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_with_flow_logs(self):
|
||||
@@ -43,18 +71,16 @@ class Test_vpc_flow_logs_enabled:
|
||||
TrafficType="ALL",
|
||||
LogDestinationType="cloud-watch-logs",
|
||||
LogGroupName="test_logs",
|
||||
DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role",
|
||||
DeliverLogsPermissionArn="arn:aws:iam::"
|
||||
+ AWS_ACCOUNT_NUMBER
|
||||
+ ":role/test-role",
|
||||
)
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import (
|
||||
@@ -81,15 +107,11 @@ class Test_vpc_flow_logs_enabled:
|
||||
|
||||
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import (
|
||||
|
||||
@@ -1,34 +1,61 @@
|
||||
from unittest import mock
|
||||
|
||||
from boto3 import client, resource
|
||||
from boto3 import client, resource, session
|
||||
from moto import mock_ec2
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
|
||||
|
||||
AWS_REGION = "us-east-1"
|
||||
ACCOUNT_ID = "123456789012"
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
|
||||
|
||||
class Test_vpc_peering_routing_tables_with_least_privilege:
|
||||
def set_mocked_audit_info(self):
|
||||
audit_info = AWS_Audit_Info(
|
||||
session_config=None,
|
||||
original_session=None,
|
||||
audit_session=session.Session(
|
||||
profile_name=None,
|
||||
botocore_session=None,
|
||||
),
|
||||
audited_account=AWS_ACCOUNT_NUMBER,
|
||||
audited_user_id=None,
|
||||
audited_partition="aws",
|
||||
audited_identity_arn=None,
|
||||
profile=None,
|
||||
profile_region=None,
|
||||
credentials=None,
|
||||
assumed_role_info=None,
|
||||
audited_regions=["us-east-1", "eu-west-1"],
|
||||
organizations_metadata=None,
|
||||
audit_resources=None,
|
||||
)
|
||||
|
||||
return audit_info
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_no_peering_connections(self):
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
|
||||
vpc_peering_routing_tables_with_least_privilege,
|
||||
)
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
|
||||
vpc_peering_routing_tables_with_least_privilege,
|
||||
)
|
||||
|
||||
check = vpc_peering_routing_tables_with_least_privilege()
|
||||
result = check.execute()
|
||||
check = vpc_peering_routing_tables_with_least_privilege()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 0
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_comply_peering_connection_(self):
|
||||
@@ -58,40 +85,44 @@ class Test_vpc_peering_routing_tables_with_least_privilege:
|
||||
DestinationCidrBlock="10.0.0.4/24", VpcPeeringConnectionId=vpc_pcx_id
|
||||
)
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC, Route
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
) as service_client:
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
|
||||
vpc_peering_routing_tables_with_least_privilege,
|
||||
)
|
||||
|
||||
service_client.vpc_peering_connections[0].route_tables = [
|
||||
Route(
|
||||
id=main_route_table_id,
|
||||
destination_cidrs=["10.12.23.44/32"],
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
) as service_client:
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
|
||||
vpc_peering_routing_tables_with_least_privilege,
|
||||
)
|
||||
]
|
||||
check = vpc_peering_routing_tables_with_least_privilege()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == len(
|
||||
ec2_client.describe_vpc_peering_connections()["VpcPeeringConnections"]
|
||||
)
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Peering Connection {vpc_pcx_id} comply with least privilege access."
|
||||
)
|
||||
assert result[0].resource_id == vpc_pcx_id
|
||||
assert result[0].region == AWS_REGION
|
||||
service_client.vpc_peering_connections[0].route_tables = [
|
||||
Route(
|
||||
id=main_route_table_id,
|
||||
destination_cidrs=["10.12.23.44/32"],
|
||||
)
|
||||
]
|
||||
check = vpc_peering_routing_tables_with_least_privilege()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == len(
|
||||
ec2_client.describe_vpc_peering_connections()[
|
||||
"VpcPeeringConnections"
|
||||
]
|
||||
)
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Peering Connection {vpc_pcx_id} comply with least privilege access."
|
||||
)
|
||||
assert result[0].resource_id == vpc_pcx_id
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
@mock_ec2
|
||||
def test_vpc_not_comply_peering_connection_(self):
|
||||
@@ -121,37 +152,41 @@ class Test_vpc_peering_routing_tables_with_least_privilege:
|
||||
DestinationCidrBlock="10.0.0.0/16", VpcPeeringConnectionId=vpc_pcx_id
|
||||
)
|
||||
|
||||
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
|
||||
from prowler.providers.aws.services.vpc.vpc_service import VPC, Route
|
||||
|
||||
current_audit_info.audited_partition = "aws"
|
||||
current_audit_info.audited_regions = ["eu-west-1", "us-east-1"]
|
||||
current_audit_info = self.set_mocked_audit_info()
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
) as service_client:
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
|
||||
vpc_peering_routing_tables_with_least_privilege,
|
||||
)
|
||||
|
||||
service_client.vpc_peering_connections[0].route_tables = [
|
||||
Route(
|
||||
id=main_route_table_id,
|
||||
destination_cidrs=["10.0.0.0/16"],
|
||||
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
|
||||
new=current_audit_info,
|
||||
):
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
|
||||
new=VPC(current_audit_info),
|
||||
) as service_client:
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
|
||||
vpc_peering_routing_tables_with_least_privilege,
|
||||
)
|
||||
]
|
||||
check = vpc_peering_routing_tables_with_least_privilege()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == len(
|
||||
ec2_client.describe_vpc_peering_connections()["VpcPeeringConnections"]
|
||||
)
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Peering Connection {vpc_pcx_id} does not comply with least privilege access since it accepts whole VPCs CIDR in its route tables."
|
||||
)
|
||||
assert result[0].resource_id == vpc_pcx_id
|
||||
assert result[0].region == AWS_REGION
|
||||
service_client.vpc_peering_connections[0].route_tables = [
|
||||
Route(
|
||||
id=main_route_table_id,
|
||||
destination_cidrs=["10.0.0.0/16"],
|
||||
)
|
||||
]
|
||||
check = vpc_peering_routing_tables_with_least_privilege()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == len(
|
||||
ec2_client.describe_vpc_peering_connections()[
|
||||
"VpcPeeringConnections"
|
||||
]
|
||||
)
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Peering Connection {vpc_pcx_id} does not comply with least privilege access since it accepts whole VPCs CIDR in its route tables."
|
||||
)
|
||||
assert result[0].resource_id == vpc_pcx_id
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
Reference in New Issue
Block a user