fix(vpc_endpoint_services_allowed_principals_trust_boundaries): Principal (#2991)

This commit is contained in:
Pepe Fagoaga
2023-10-31 14:19:20 +01:00
committed by GitHub
parent 1014d64828
commit f666711a2a
2 changed files with 315 additions and 94 deletions

View File

@@ -1,3 +1,5 @@
from re import compile
from prowler.lib.check.models import Check, Check_Report_AWS from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.vpc.vpc_client import vpc_client from prowler.providers.aws.services.vpc.vpc_client import vpc_client
@@ -21,7 +23,14 @@ class vpc_endpoint_services_allowed_principals_trust_boundaries(Check):
findings.append(report) findings.append(report)
else: else:
for principal in service.allowed_principals: for principal in service.allowed_principals:
account_id = principal.split(":")[4] # Account ID can be an ARN or just a 12-digit string
pattern = compile(r"^[0-9]{12}$")
match = pattern.match(principal)
if not match:
account_id = principal.split(":")[4]
else:
account_id = match.string
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = service.region report.region = service.region
if ( if (

View File

@@ -1,85 +1,27 @@
from unittest import mock from unittest import mock
import botocore from boto3 import client
from boto3 import client, session
from mock import patch
from moto import mock_ec2, mock_elbv2 from moto import mock_ec2, mock_elbv2
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info from tests.providers.aws.audit_info_utils import (
from prowler.providers.common.models import Audit_Metadata AWS_REGION_US_EAST_1,
set_mocked_aws_audit_info,
)
AWS_REGION = "us-east-1"
AWS_ACCOUNT_NUMBER = "123456789012" AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
# Mocking VPC Calls AWS_ACCOUNT_NUMBER_2 = "111122223333"
make_api_call = botocore.client.BaseClient._make_api_call AWS_ACCOUNT_ARN_2 = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER_2}:root"
def mock_make_api_call(self, operation_name, kwarg):
"""
We have to mock every AWS API call using Boto3
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
"""
if operation_name == "DescribeVpcEndpointServices":
return {
"ServiceDetails": [
{
"ServiceId": "vpce-svc-4b919ac5",
"ServiceName": "string",
"Owner": AWS_ACCOUNT_NUMBER,
"StageName": "test-stage",
}
]
}
return make_api_call(self, operation_name, kwarg)
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_vpc_endpoint_services_allowed_principals_trust_boundaries: class Test_vpc_endpoint_services_allowed_principals_trust_boundaries:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root",
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=[AWS_REGION],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
@mock_ec2 @mock_ec2
def test_vpc_no_endpoint_services(self): def test_no_vpc_endpoint_services(self):
# VPC Endpoint Services
ec2_client = client("ec2", region_name=AWS_REGION)
endpoint_id = ec2_client.describe_vpc_endpoint_services()["ServiceDetails"][0][
"ServiceId"
]
endpoint_arn = f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}"
from prowler.providers.aws.services.vpc.vpc_service import VPC from prowler.providers.aws.services.vpc.vpc_service import VPC
current_audit_info = self.set_mocked_audit_info() current_audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_US_EAST_1]
)
# Set config variable # Set config variable
current_audit_info.audit_config = {"trusted_account_ids": []} current_audit_info.audit_config = {"trusted_account_ids": []}
@@ -99,23 +41,14 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries:
check = vpc_endpoint_services_allowed_principals_trust_boundaries() check = vpc_endpoint_services_allowed_principals_trust_boundaries()
result = check.execute() result = check.execute()
assert len(result) == 1 # one endpoint per region assert len(result) == 0
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"VPC Endpoint Service {endpoint_id} has no allowed principals."
)
assert result[0].resource_id == endpoint_id
assert result[0].resource_arn == endpoint_arn
assert result[0].resource_tags is None
assert result[0].region == AWS_REGION
@mock_ec2 @mock_ec2
@mock_elbv2 @mock_elbv2
def test_vpc_endpoint_service_without_allowed_principals(self): def test_vpc_endpoint_service_without_allowed_principals(self):
# Create VPC Mocked Resources # Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION) ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
elbv2_client = client("elbv2", region_name=AWS_REGION) elbv2_client = client("elbv2", region_name=AWS_REGION_US_EAST_1)
vpc = ec2_client.create_vpc( vpc = ec2_client.create_vpc(
CidrBlock="172.28.7.0/24", InstanceTenancy="default" CidrBlock="172.28.7.0/24", InstanceTenancy="default"
@@ -123,7 +56,7 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries:
subnet = ec2_client.create_subnet( subnet = ec2_client.create_subnet(
VpcId=vpc["Vpc"]["VpcId"], VpcId=vpc["Vpc"]["VpcId"],
CidrBlock="172.28.7.192/26", CidrBlock="172.28.7.192/26",
AvailabilityZone=f"{AWS_REGION}a", AvailabilityZone=f"{AWS_REGION_US_EAST_1}a",
) )
lb_name = "lb_vpce-test" lb_name = "lb_vpce-test"
lb_arn = elbv2_client.create_load_balancer( lb_arn = elbv2_client.create_load_balancer(
@@ -133,18 +66,17 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries:
Type="network", Type="network",
)["LoadBalancers"][0]["LoadBalancerArn"] )["LoadBalancers"][0]["LoadBalancerArn"]
_ = ec2_client.create_vpc_endpoint_service_configuration( endpoint_id = ec2_client.create_vpc_endpoint_service_configuration(
NetworkLoadBalancerArns=[lb_arn] NetworkLoadBalancerArns=[lb_arn]
) )["ServiceConfiguration"]["ServiceId"]
endpoint_id = ec2_client.describe_vpc_endpoint_services()["ServiceDetails"][0][ endpoint_arn = f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}"
"ServiceId"
]
endpoint_arn = f"arn:aws:ec2:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}"
from prowler.providers.aws.services.vpc.vpc_service import VPC from prowler.providers.aws.services.vpc.vpc_service import VPC
current_audit_info = self.set_mocked_audit_info() current_audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_US_EAST_1]
)
# Set config variable # Set config variable
current_audit_info.audit_config = {"trusted_account_ids": []} current_audit_info.audit_config = {"trusted_account_ids": []}
@@ -168,9 +100,289 @@ class Test_vpc_endpoint_services_allowed_principals_trust_boundaries:
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert ( assert (
result[0].status_extended result[0].status_extended
== f"VPC Endpoint Service {ec2_client.describe_vpc_endpoint_services()['ServiceDetails'][0]['ServiceId']} has no allowed principals." == f"VPC Endpoint Service {endpoint_id} has no allowed principals."
) )
assert result[0].resource_id == endpoint_id assert result[0].resource_id == endpoint_id
assert result[0].resource_arn == endpoint_arn assert result[0].resource_arn == endpoint_arn
assert result[0].resource_tags is None assert result[0].resource_tags == []
assert result[0].region == AWS_REGION assert result[0].region == AWS_REGION_US_EAST_1
@mock_ec2
@mock_elbv2
def test_vpc_endpoint_service_with_allowed_principal_account_arn(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
elbv2_client = client("elbv2", region_name=AWS_REGION_US_EAST_1)
vpc = ec2_client.create_vpc(
CidrBlock="172.28.7.0/24", InstanceTenancy="default"
)
subnet = ec2_client.create_subnet(
VpcId=vpc["Vpc"]["VpcId"],
CidrBlock="172.28.7.192/26",
AvailabilityZone=f"{AWS_REGION_US_EAST_1}a",
)
lb_name = "lb_vpce-test"
lb_arn = elbv2_client.create_load_balancer(
Name=lb_name,
Subnets=[subnet["Subnet"]["SubnetId"]],
Scheme="internal",
Type="network",
)["LoadBalancers"][0]["LoadBalancerArn"]
endpoint_id = ec2_client.create_vpc_endpoint_service_configuration(
NetworkLoadBalancerArns=[lb_arn]
)["ServiceConfiguration"]["ServiceId"]
# Add allowed principals
ec2_client.modify_vpc_endpoint_service_permissions(
ServiceId=endpoint_id, AddAllowedPrincipals=[AWS_ACCOUNT_ARN]
)
endpoint_arn = f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}"
from prowler.providers.aws.services.vpc.vpc_service import VPC
current_audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_US_EAST_1]
)
# Set config variable
current_audit_info.audit_config = {"trusted_account_ids": []}
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
vpc_endpoint_services_allowed_principals_trust_boundaries,
)
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint Service {endpoint_id}."
)
assert result[0].resource_id == endpoint_id
assert result[0].resource_arn == endpoint_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1
@mock_ec2
@mock_elbv2
def test_vpc_endpoint_service_with_allowed_principal_account_number(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
elbv2_client = client("elbv2", region_name=AWS_REGION_US_EAST_1)
vpc = ec2_client.create_vpc(
CidrBlock="172.28.7.0/24", InstanceTenancy="default"
)
subnet = ec2_client.create_subnet(
VpcId=vpc["Vpc"]["VpcId"],
CidrBlock="172.28.7.192/26",
AvailabilityZone=f"{AWS_REGION_US_EAST_1}a",
)
lb_name = "lb_vpce-test"
lb_arn = elbv2_client.create_load_balancer(
Name=lb_name,
Subnets=[subnet["Subnet"]["SubnetId"]],
Scheme="internal",
Type="network",
)["LoadBalancers"][0]["LoadBalancerArn"]
endpoint_id = ec2_client.create_vpc_endpoint_service_configuration(
NetworkLoadBalancerArns=[lb_arn]
)["ServiceConfiguration"]["ServiceId"]
# Add allowed principals
ec2_client.modify_vpc_endpoint_service_permissions(
ServiceId=endpoint_id, AddAllowedPrincipals=[AWS_ACCOUNT_NUMBER]
)
endpoint_arn = f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}"
from prowler.providers.aws.services.vpc.vpc_service import VPC
current_audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_US_EAST_1]
)
# Set config variable
current_audit_info.audit_config = {"trusted_account_ids": []}
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
vpc_endpoint_services_allowed_principals_trust_boundaries,
)
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Found trusted account {AWS_ACCOUNT_NUMBER} in VPC Endpoint Service {endpoint_id}."
)
assert result[0].resource_id == endpoint_id
assert result[0].resource_arn == endpoint_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1
@mock_ec2
@mock_elbv2
def test_vpc_endpoint_service_with_principal_not_allowed(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
elbv2_client = client("elbv2", region_name=AWS_REGION_US_EAST_1)
vpc = ec2_client.create_vpc(
CidrBlock="172.28.7.0/24", InstanceTenancy="default"
)
subnet = ec2_client.create_subnet(
VpcId=vpc["Vpc"]["VpcId"],
CidrBlock="172.28.7.192/26",
AvailabilityZone=f"{AWS_REGION_US_EAST_1}a",
)
lb_name = "lb_vpce-test"
lb_arn = elbv2_client.create_load_balancer(
Name=lb_name,
Subnets=[subnet["Subnet"]["SubnetId"]],
Scheme="internal",
Type="network",
)["LoadBalancers"][0]["LoadBalancerArn"]
endpoint_id = ec2_client.create_vpc_endpoint_service_configuration(
NetworkLoadBalancerArns=[lb_arn]
)["ServiceConfiguration"]["ServiceId"]
# Add allowed principals
ec2_client.modify_vpc_endpoint_service_permissions(
ServiceId=endpoint_id, AddAllowedPrincipals=[AWS_ACCOUNT_NUMBER_2]
)
endpoint_arn = f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}"
from prowler.providers.aws.services.vpc.vpc_service import VPC
current_audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_US_EAST_1]
)
# Set config variable
current_audit_info.audit_config = {"trusted_account_ids": []}
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
vpc_endpoint_services_allowed_principals_trust_boundaries,
)
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Found untrusted account {AWS_ACCOUNT_NUMBER_2} in VPC Endpoint Service {endpoint_id}."
)
assert result[0].resource_id == endpoint_id
assert result[0].resource_arn == endpoint_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1
@mock_ec2
@mock_elbv2
def test_vpc_endpoint_service_with_principal_different_than_account_but_allowed_in_config(
self,
):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
elbv2_client = client("elbv2", region_name=AWS_REGION_US_EAST_1)
vpc = ec2_client.create_vpc(
CidrBlock="172.28.7.0/24", InstanceTenancy="default"
)
subnet = ec2_client.create_subnet(
VpcId=vpc["Vpc"]["VpcId"],
CidrBlock="172.28.7.192/26",
AvailabilityZone=f"{AWS_REGION_US_EAST_1}a",
)
lb_name = "lb_vpce-test"
lb_arn = elbv2_client.create_load_balancer(
Name=lb_name,
Subnets=[subnet["Subnet"]["SubnetId"]],
Scheme="internal",
Type="network",
)["LoadBalancers"][0]["LoadBalancerArn"]
endpoint_id = ec2_client.create_vpc_endpoint_service_configuration(
NetworkLoadBalancerArns=[lb_arn]
)["ServiceConfiguration"]["ServiceId"]
# Add allowed principals
ec2_client.modify_vpc_endpoint_service_permissions(
ServiceId=endpoint_id, AddAllowedPrincipals=[AWS_ACCOUNT_NUMBER_2]
)
endpoint_arn = f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:vpc-endpoint-service/{endpoint_id}"
from prowler.providers.aws.services.vpc.vpc_service import VPC
current_audit_info = set_mocked_aws_audit_info(
audited_regions=[AWS_REGION_US_EAST_1]
)
# Set config variable
current_audit_info.audit_config = {
"trusted_account_ids": [AWS_ACCOUNT_NUMBER_2]
}
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
vpc_endpoint_services_allowed_principals_trust_boundaries,
)
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Found trusted account {AWS_ACCOUNT_NUMBER_2} in VPC Endpoint Service {endpoint_id}."
)
assert result[0].resource_id == endpoint_id
assert result[0].resource_arn == endpoint_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1