feat(vpc): add service, checks and tests (#1432)

This commit is contained in:
Sergio Garcia
2022-10-28 12:15:15 +02:00
committed by GitHub
parent 7e1b0d13c7
commit b2976984d3
32 changed files with 1566 additions and 246 deletions

View File

@@ -1 +1,6 @@
shodan_api_key: null
# Single account environment: No action required. The AWS account number will be automatically added by the checks.
# Multi account environment: Any additional trusted account number should be added as a space separated list, e.g.
# trusted_account_ids : ["123456789012", "098765432109", "678901234567"]
trusted_account_ids : []

View File

View File

@@ -1,52 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_check29="2.9"
CHECK_TITLE_check29="[check29] Ensure VPC Flow Logging is Enabled in all VPCs"
CHECK_SCORED_check29="SCORED"
CHECK_CIS_LEVEL_check29="LEVEL2"
CHECK_SEVERITY_check29="Medium"
CHECK_ASFF_TYPE_check29="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"
CHECK_ASFF_RESOURCE_TYPE_check29="AwsEc2Vpc"
CHECK_ALTERNATE_check209="check29"
CHECK_ASFF_COMPLIANCE_TYPE_check29="ens-op.mon.1.aws.flow.1"
CHECK_SERVICENAME_check29="vpc"
CHECK_RISK_check29='VPC Flow Logs provide visibility into network traffic that traverses the VPC and can be used to detect anomalous traffic or insight during security workflows.'
CHECK_REMEDIATION_check29='It is recommended that VPC Flow Logs be enabled for packet "Rejects" for VPCs. '
CHECK_DOC_check29='http://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs.html '
CHECK_CAF_EPIC_check29='Logging and Monitoring'
check29(){
# "Ensure VPC Flow Logging is Enabled in all VPCs (Scored)"
for regx in $REGIONS; do
AVAILABLE_VPC=$($AWSCLI ec2 describe-vpcs $PROFILE_OPT --region $regx --query 'Vpcs[?State==`available`].VpcId' --output text 2>&1)
if [[ $(echo "$AVAILABLE_VPC" | grep -E 'AccessDenied|UnauthorizedOperation') ]]; then
textInfo "$regx: Access Denied trying to describe VPCs" "$regx" "$vpcx"
continue
fi
for vpcx in $AVAILABLE_VPC; do
CHECK_FL=$($AWSCLI ec2 describe-flow-logs $PROFILE_OPT --region $regx --filter Name="resource-id",Values="${vpcx}" --query 'FlowLogs[?FlowLogStatus==`ACTIVE`].FlowLogId' --output text 2>&1)
if [[ $(echo "$CHECK_FL" | grep -E 'AccessDenied|UnauthorizedOperation') ]]; then
textInfo "$regx: Access Denied trying to describe flow logs in VPC $vpcx" "$regx" "$vpcx"
continue
fi
if [[ $CHECK_FL ]]; then
for FL in $CHECK_FL; do
textPass "$regx: VPC $vpcx VPCFlowLog is enabled for LogGroupName: $FL" "$regx" "$vpcx"
done
else
textFail "$regx: VPC $vpcx VPCFlowLog is disabled" "$regx" "$vpcx"
fi
done
done
}

View File

@@ -1,48 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_check44="4.4"
CHECK_TITLE_check44="[check44] Ensure routing tables for VPC peering are \"least access\""
CHECK_SCORED_check44="NOT_SCORED"
CHECK_CIS_LEVEL_check44="LEVEL2"
CHECK_SEVERITY_check44="Medium"
CHECK_ASFF_TYPE_check44="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark"
CHECK_ASFF_RESOURCE_TYPE_check44="AwsEc2Vpc"
CHECK_ALTERNATE_check404="check44"
CHECK_SERVICENAME_check44="vpc"
CHECK_RISK_check44='Being highly selective in peering routing tables is a very effective way of minimizing the impact of breach as resources outside of these routes are inaccessible to the peered VPC.'
CHECK_REMEDIATION_check44='Review routing tables of peered VPCs for whether they route all subnets of each VPC and whether that is necessary to accomplish the intended purposes for peering the VPCs.'
CHECK_DOC_check44='https://docs.aws.amazon.com/vpc/latest/peering/peering-configurations-partial-access.html'
CHECK_CAF_EPIC_check44='Infrastructure Security'
check44(){
# "Ensure routing tables for VPC peering are \"least access\" (Not Scored)"
for regx in $REGIONS; do
LIST_OF_VPCS_PEERING_CONNECTIONS=$($AWSCLI ec2 describe-vpc-peering-connections --output text $PROFILE_OPT --region $regx --query 'VpcPeeringConnections[*].VpcPeeringConnectionId' 2>&1| sort | paste -s -d" " - )
if [[ $(echo "$LIST_OF_VPCS_PEERING_CONNECTIONS" | grep -E 'AccessDenied|UnauthorizedOperation') ]]; then
textInfo "$regx: Access Denied trying to describe vpc peering connections" "$regx"
continue
fi
if [[ $LIST_OF_VPCS_PEERING_CONNECTIONS ]];then
textInfo "$regx: $LIST_OF_VPCS_PEERING_CONNECTIONS - review routing tables" "$regx" "$LIST_OF_VPCS_PEERING_CONNECTIONS"
#LIST_OF_VPCS=$($AWSCLI ec2 describe-vpcs $PROFILE_OPT --region $regx --query 'Vpcs[*].VpcId' --output text)
#aws ec2 describe-route-tables --filter "Name=vpc-id,Values=vpc-0213e864" --query "RouteTables[*].{RouteTableId:RouteTableId, VpcId:VpcId, Routes:Routes, AssociatedSubnets:Associations[*].SubnetId}" $PROFILE_OPT --region $regx
# for vpc in $LIST_OF_VPCS; do
# VPCS_WITH_PEERING=$($AWSCLI ec2 describe-route-tables --filter "Name=vpc-id,Values=$vpc" $PROFILE_OPT --region $regx --query "RouteTables[*].{RouteTableId:RouteTableId, VpcId:VpcId, Routes:Routes, AssociatedSubnets:Associations[*].SubnetId}" |grep GatewayId|grep pcx-)
# done
#echo $VPCS_WITH_PEERING
else
textPass "$regx: No VPC peering found" "$regx" "$LIST_OF_VPCS_PEERING_CONNECTIONS"
fi
done
}

View File

@@ -1,67 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2020) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra789="7.89"
CHECK_TITLE_extra789="[extra789] Find trust boundaries in VPC endpoint services connections"
CHECK_SCORED_extra789="NOT_SCORED"
CHECK_CIS_LEVEL_extra789="EXTRA"
CHECK_SEVERITY_extra789="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra789="AwsEc2Vpc"
CHECK_ALTERNATE_extra789="extra789"
CHECK_SERVICENAME_extra789="vpc"
CHECK_RISK_extra789='Account VPC could be linked to other accounts.'
CHECK_REMEDIATION_extra789='In multi Account environments identify untrusted links. Check trust chaining and dependencies between accounts.'
CHECK_DOC_extra789='https://github.com/toniblyx/prowler/#trust-boundaries-checks'
CHECK_CAF_EPIC_extra789='Infrastructure Security'
extra789(){
TRUSTED_ACCOUNT_IDS=$( echo "${ACCOUNT_NUM} ${GROUP_TRUSTBOUNDARIES_TRUSTED_ACCOUNT_IDS}" | xargs )
for regx in ${REGIONS}; do
ENDPOINT_SERVICES_IDS=$(${AWSCLI} ec2 describe-vpc-endpoint-services \
${PROFILE_OPT} \
--query "ServiceDetails[?Owner=='${ACCOUNT_NUM}'].ServiceId" \
--region ${regx} \
--output text | xargs 2>&1)
if [[ $(echo "$ENDPOINT_SERVICES_IDS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to describe VPC endpoint services" "$regx"
continue
fi
for ENDPOINT_SERVICE_ID in ${ENDPOINT_SERVICES_IDS}; do
ENDPOINT_CONNECTION_LIST=$(${AWSCLI} ec2 describe-vpc-endpoint-connections \
${PROFILE_OPT} \
--query "VpcEndpointConnections[?VpcEndpointState=='available'].VpcEndpointOwner" \
--region ${regx} \
--output text | xargs
)
for ENDPOINT_CONNECTION in ${ENDPOINT_CONNECTION_LIST}; do
for ACCOUNT_ID in ${TRUSTED_ACCOUNT_IDS}; do
if [[ "${ACCOUNT_ID}" == "${ENDPOINT_CONNECTION}" ]]; then
textPass "${regx}: Found trusted account in VPC endpoint service connection ${ENDPOINT_CONNECTION}" "${regx}" "${ENDPOINT_CONNECTION}"
# Algorithm:
# Remove all trusted ACCOUNT_IDs from ENDPOINT_CONNECTION_LIST.
# As a result, the ENDPOINT_CONNECTION_LIST finally contains only unknown/untrusted account ids.
ENDPOINT_CONNECTION_LIST=("${ENDPOINT_CONNECTION_LIST[@]/$ENDPOINT_CONNECTION}") # remove hit from allowlist
fi
done
done
for UNTRUSTED_CONNECTION in ${ENDPOINT_CONNECTION_LIST}; do
textFail "${regx}: Found untrusted account in VPC endpoint service connection ${UNTRUSTED_CONNECTION}" "${regx}" "${ENDPOINT_CONNECTION}"
done
done
done
}

View File

@@ -1,70 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2020) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra790="7.90"
CHECK_TITLE_extra790="[extra790] Find trust boundaries in VPC endpoint services allowlisted principles"
CHECK_SCORED_extra790="NOT_SCORED"
CHECK_CIS_LEVEL_extra790="EXTRA"
CHECK_SEVERITY_extra790="Medium"
CHECK_ASFF_RESOURCE_TYPE_extra790="AwsEc2Vpc"
CHECK_ALTERNATE_extra790="extra790"
CHECK_SERVICENAME_extra790="vpc"
CHECK_RISK_extra790='Account VPC could be linked to other accounts.'
CHECK_REMEDIATION_extra790='In multi Account environments identify untrusted links. Check trust chaining and dependencies between accounts.'
CHECK_DOC_extra790='https://github.com/toniblyx/prowler/#trust-boundaries-checks'
CHECK_CAF_EPIC_extra790='Infrastructure Security'
extra790(){
TRUSTED_ACCOUNT_IDS=$( echo "${ACCOUNT_NUM} ${GROUP_TRUSTBOUNDARIES_TRUSTED_ACCOUNT_IDS}" | xargs )
for regx in ${REGIONS}; do
ENDPOINT_SERVICES_IDS=$(${AWSCLI} ec2 describe-vpc-endpoint-services \
${PROFILE_OPT} \
--query "ServiceDetails[?Owner=='${ACCOUNT_NUM}'].ServiceId" \
--region ${regx} \
--output text | xargs 2>&1)
if [[ $(echo "$ENDPOINT_SERVICES_IDS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then
textInfo "$regx: Access Denied trying to describe VPC endpoint services" "$regx"
continue
fi
for ENDPOINT_SERVICE_ID in ${ENDPOINT_SERVICES_IDS}; do
ENDPOINT_PERMISSIONS_LIST=$(${AWSCLI} ec2 describe-vpc-endpoint-service-permissions \
${PROFILE_OPT} \
--service-id ${ENDPOINT_SERVICE_ID} \
--query "AllowedPrincipals[*].Principal" \
--region ${regx} \
--output text | xargs
)
for ENDPOINT_PERMISSION in ${ENDPOINT_PERMISSIONS_LIST}; do
# Take only account id from ENDPOINT_PERMISSION: arn:aws:iam::965406151242:root
ENDPOINT_PERMISSION_ACCOUNT_ID=$(echo ${ENDPOINT_PERMISSION} | cut -d':' -f5 | xargs)
for ACCOUNT_ID in ${TRUSTED_ACCOUNT_IDS}; do
if [[ "${ACCOUNT_ID}" == "${ENDPOINT_PERMISSION_ACCOUNT_ID}" ]]; then
textPass "${regx}: Found trusted account in VPC endpoint service permission ${ENDPOINT_PERMISSION}" "${regx}"
# Algorithm:
# Remove all trusted ACCOUNT_IDs from ENDPOINT_PERMISSIONS_LIST.
# As a result, the ENDPOINT_PERMISSIONS_LIST finally contains only unknown/untrusted account ids.
ENDPOINT_PERMISSIONS_LIST=("${ENDPOINT_PERMISSIONS_LIST[@]/$ENDPOINT_PERMISSION}")
fi
done
done
for UNTRUSTED_PERMISSION in ${ENDPOINT_PERMISSIONS_LIST}; do
textFail "${regx}: Found untrusted account in VPC endpoint service permission ${UNTRUSTED_PERMISSION}" "${regx}" "${UNTRUSTED_PERMISSION}"
done
done
done
}

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
vpc_client = VPC(current_audit_info)

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "vpc_endpoint_connections_trust_boundaries",
"CheckTitle": "Find trust boundaries in VPC endpoint connections.",
"CheckType": ["Infrastructure Security"],
"ServiceName": "vpc",
"SubServiceName": "endpoint",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsEc2Vpc",
"Description": "Find trust boundaries in VPC endpoint connections.",
"Risk": "Account VPC could be linked to other accounts.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.bridgecrew.io/docs/networking_9#aws-vpc-endpoints-are-exposed",
"Terraform": ""
},
"Recommendation": {
"Text": "In multi Account environments identify untrusted links. Check trust chaining and dependencies between accounts.",
"Url": "https://docs.aws.amazon.com/vpc/latest/privatelink/vpc-endpoints-access.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,45 @@
from config.config import get_config_var
from lib.check.models import Check, Check_Report
from providers.aws.services.vpc.vpc_client import vpc_client
class vpc_endpoint_connections_trust_boundaries(Check):
def execute(self):
findings = []
# Get trusted account_ids from config.yaml
trusted_account_ids = get_config_var("trusted_account_ids")
for endpoint in vpc_client.vpc_endpoints:
# Check VPC endpoint policy
for statement in endpoint.policy_document["Statement"]:
if "*" == statement["Principal"]:
report = Check_Report(self.metadata)
report.region = endpoint.region
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access."
report.resource_id = endpoint.id
findings.append(report)
break
else:
if type(statement["Principal"]["AWS"]) == str:
principals = [statement["Principal"]["AWS"]]
else:
principals = statement["Principal"]["AWS"]
for principal_arn in principals:
account_id = principal_arn.split(":")[4]
report = Check_Report(self.metadata)
report.region = endpoint.region
if (
account_id in trusted_account_ids
or account_id in vpc_client.audited_account
):
report.status = "PASS"
report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
else:
report.status = "FAIL"
report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}."
report.resource_id = endpoint.id
findings.append(report)
return findings

View File

@@ -0,0 +1,272 @@
import json
from unittest import mock
from boto3 import client
from moto import mock_ec2
AWS_REGION = "us-east-1"
ACCOUNT_ID = "123456789012"
def mock_get_config_var(config_var):
if config_var == "trusted_account_ids":
return ["123456789010"]
return []
class Test_vpc_endpoint_connections_trust_boundaries:
@mock_ec2
def test_vpc_no_endpoints(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
vpc_endpoint_connections_trust_boundaries,
)
check = vpc_endpoint_connections_trust_boundaries()
result = check.execute()
assert len(result) == 0
@mock_ec2
def test_vpc_endpoint_with_full_access(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"]
vpc_endpoint = ec2_client.create_vpc_endpoint(
VpcId=vpc["VpcId"],
ServiceName="com.amazonaws.us-east-1.s3",
RouteTableIds=[route_table["RouteTableId"]],
VpcEndpointType="Gateway",
PolicyDocument=json.dumps(
{
"Statement": [
{
"Action": "*",
"Effect": "Allow",
"Principal": "*",
"Resource": "*",
}
]
}
),
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
vpc_endpoint_connections_trust_boundaries,
)
check = vpc_endpoint_connections_trust_boundaries()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} has full access."
)
assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
assert result[0].region == AWS_REGION
@mock_ec2
def test_vpc_endpoint_with_trusted_account(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"]
vpc_endpoint = ec2_client.create_vpc_endpoint(
VpcId=vpc["VpcId"],
ServiceName="com.amazonaws.us-east-1.s3",
RouteTableIds=[route_table["RouteTableId"]],
VpcEndpointType="Gateway",
PolicyDocument=json.dumps(
{
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::123456789012:root"},
"Action": "*",
"Resource": "*",
}
]
}
),
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
current_audit_info.audited_account = ACCOUNT_ID
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
vpc_endpoint_connections_trust_boundaries,
)
check = vpc_endpoint_connections_trust_boundaries()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Found trusted account {ACCOUNT_ID} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
)
assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
assert result[0].region == AWS_REGION
@mock_ec2
def test_vpc_endpoint_with_untrusted_account(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"]
vpc_endpoint = ec2_client.create_vpc_endpoint(
VpcId=vpc["VpcId"],
ServiceName="com.amazonaws.us-east-1.s3",
RouteTableIds=[route_table["RouteTableId"]],
VpcEndpointType="Gateway",
PolicyDocument=json.dumps(
{
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::123456789010:root"},
"Action": "*",
"Resource": "*",
}
]
}
),
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
current_audit_info.audited_account = ACCOUNT_ID
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
vpc_endpoint_connections_trust_boundaries,
)
check = vpc_endpoint_connections_trust_boundaries()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Found untrusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
)
assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
@mock_ec2
def test_vpc_endpoint_with_config_trusted_account(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"]
vpc_endpoint = ec2_client.create_vpc_endpoint(
VpcId=vpc["VpcId"],
ServiceName="com.amazonaws.us-east-1.s3",
RouteTableIds=[route_table["RouteTableId"]],
VpcEndpointType="Gateway",
PolicyDocument=json.dumps(
{
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::123456789010:root"},
"Action": "*",
"Resource": "*",
}
]
}
),
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
current_audit_info.audited_account = ACCOUNT_ID
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.get_config_var",
new=mock_get_config_var,
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
vpc_endpoint_connections_trust_boundaries,
)
check = vpc_endpoint_connections_trust_boundaries()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Found trusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}."
)
assert (
result[0].resource_id
== vpc_endpoint["VpcEndpoint"]["VpcEndpointId"]
)
assert result[0].region == AWS_REGION
@mock_ec2
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client",
new=mock_client,
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import (
vpc_endpoint_connections_trust_boundaries,
)
check = vpc_endpoint_connections_trust_boundaries()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "vpc_endpoint_services_allowed_principals_trust_boundaries",
"CheckTitle": "Find trust boundaries in VPC endpoint services allowlisted principles.",
"CheckType": ["Infrastructure Security"],
"ServiceName": "vpc",
"SubServiceName": "service_endpoint",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsEc2Vpc",
"Description": "Find trust boundaries in VPC endpoint services allowlisted principles.",
"Risk": "Account VPC could be linked to other accounts.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.bridgecrew.io/docs/networking_9#aws-vpc-endpoints-are-exposed",
"Terraform": ""
},
"Recommendation": {
"Text": "In multi Account environments identify untrusted links. Check trust chaining and dependencies between accounts.",
"Url": "https://docs.aws.amazon.com/vpc/latest/privatelink/vpc-endpoints-access.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,39 @@
from config.config import get_config_var
from lib.check.models import Check, Check_Report
from providers.aws.services.vpc.vpc_client import vpc_client
class vpc_endpoint_services_allowed_principals_trust_boundaries(Check):
def execute(self):
findings = []
# Get trusted account_ids from config.yaml
trusted_account_ids = get_config_var("trusted_account_ids")
for service in vpc_client.vpc_endpoint_services:
if not service.allowed_principals:
report = Check_Report(self.metadata)
report.region = service.region
report.status = "PASS"
report.status_extended = (
f"VPC Endpoint Service {service.id} has no allowed principals."
)
report.resource_id = service.id
findings.append(report)
else:
for principal in service.allowed_principals:
account_id = principal.split(":")[4]
report = Check_Report(self.metadata)
report.region = service.region
if (
account_id in trusted_account_ids
or account_id in vpc_client.audited_account
):
report.status = "PASS"
report.status_extended = f"Found trusted account {account_id} in VPC Endpoint Service {service.id}."
report.resource_id = service.id
else:
report.status = "FAIL"
report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint Service {service.id}."
report.resource_id = service.id
findings.append(report)
return findings

View File

@@ -0,0 +1,128 @@
from unittest import mock
import botocore
from boto3 import client
from mock import patch
from moto import mock_ec2, mock_elbv2
AWS_REGION = "us-east-1"
ACCOUNT_ID = "123456789012"
# Mocking VPC Calls
make_api_call = botocore.client.BaseClient._make_api_call
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "DescribeVpcEndpointServices":
return {
"ServiceDetails": [
{
"ServiceId": "vpce-svc-4b919ac5",
"ServiceName": "string",
"Owner": ACCOUNT_ID,
"StageName": "test-stage",
}
]
}
return make_api_call(self, operation_name, kwarg)
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_vpc_endpoint_services_allowed_principals_trust_boundaries:
@mock_ec2
def test_vpc_no_endpoint_services(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
vpc_endpoint_services_allowed_principals_trust_boundaries,
)
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
result = check.execute()
assert len(result) == 23 # one endpoint per region
@mock_ec2
@mock_elbv2
def test_vpc_endpoint_service_without_allowed_principals(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
elbv2_client = client("elbv2", region_name=AWS_REGION)
vpc = ec2_client.create_vpc(
CidrBlock="172.28.7.0/24", InstanceTenancy="default"
)
subnet = ec2_client.create_subnet(
VpcId=vpc["Vpc"]["VpcId"],
CidrBlock="172.28.7.192/26",
AvailabilityZone=f"{AWS_REGION}a",
)
lb_name = "lb_vpce-test"
lb_arn = elbv2_client.create_load_balancer(
Name=lb_name,
Subnets=[subnet["Subnet"]["SubnetId"]],
Scheme="internal",
Type="network",
)["LoadBalancers"][0]["LoadBalancerArn"]
# Service is mocked until moto fix the issue https://github.com/spulec/moto/issues/5605
# service = ec2_client.create_vpc_endpoint_service_configuration(
# NetworkLoadBalancerArns=[lb_arn]
# )
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
vpc_endpoint_services_allowed_principals_trust_boundaries,
)
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
result = check.execute()
assert len(result) == 23 # one per region
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"VPC Endpoint Service {ec2_client.describe_vpc_endpoint_services()['ServiceDetails'][0]['ServiceId']} has no allowed principals."
)
assert (
result[0].resource_id
== ec2_client.describe_vpc_endpoint_services()["ServiceDetails"][0][
"ServiceId"
]
)
@mock_ec2
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client",
new=mock_client,
):
# Test Check
from providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import (
vpc_endpoint_services_allowed_principals_trust_boundaries,
)
check = vpc_endpoint_services_allowed_principals_trust_boundaries()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "vpc_flow_logs_enabled",
"CheckTitle": "Ensure VPC Flow Logging is Enabled in all VPCs.",
"CheckType": ["Logging and Monitoring"],
"ServiceName": "vpc",
"SubServiceName": "flow_log",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsEc2Vpc",
"Description": "Ensure VPC Flow Logging is Enabled in all VPCs.",
"Risk": "VPC Flow Logs provide visibility into network traffic that traverses the VPC and can be used to detect anomalous traffic or insight during security workflows.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.bridgecrew.io/docs/logging_9-enable-vpc-flow-logging#aws-console",
"Terraform": "https://docs.bridgecrew.io/docs/logging_9-enable-vpc-flow-logging#terraform"
},
"Recommendation": {
"Text": "It is recommended that VPC Flow Logs be enabled for packet Rejects for VPCs.",
"Url": "http://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,21 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.vpc.vpc_client import vpc_client
class vpc_flow_logs_enabled(Check):
def execute(self):
findings = []
for vpc in vpc_client.vpcs:
report = Check_Report(self.metadata)
report.region = vpc.region
if vpc.flow_log:
report.status = "PASS"
report.status_extended = f"VPC {vpc.id} Flow logs are enabled."
report.resource_id = vpc.id
else:
report.status = "FAIL"
report.status_extended = f"VPC {vpc.id} Flow logs are disabled."
report.resource_id = vpc.id
findings.append(report)
return findings

View File

@@ -0,0 +1,127 @@
from unittest import mock
from boto3 import client
from moto import mock_ec2
AWS_REGION = "us-east-1"
ACCOUNT_ID = "123456789012"
class Test_vpc_flow_logs_enabled:
@mock_ec2
def test_vpc_only_default_vpcs(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import (
vpc_flow_logs_enabled,
)
check = vpc_flow_logs_enabled()
result = check.execute()
assert (
len(result) == 23
) # Number of AWS regions, one default VPC per region
@mock_ec2
def test_vpc_with_flow_logs(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
ec2_client.create_flow_logs(
ResourceType="VPC",
ResourceIds=[vpc["VpcId"]],
TrafficType="ALL",
LogDestinationType="cloud-watch-logs",
LogGroupName="test_logs",
DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role",
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import (
vpc_flow_logs_enabled,
)
check = vpc_flow_logs_enabled()
result = check.execute()
# Search created VPC among default ones
for result in result:
if result.resource_id == vpc["VpcId"]:
assert result.status == "PASS"
assert (
result.status_extended
== f"VPC {vpc['VpcId']} Flow logs are enabled."
)
assert result.resource_id == vpc["VpcId"]
@mock_ec2
def test_vpc_without_flow_logs(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import (
vpc_flow_logs_enabled,
)
check = vpc_flow_logs_enabled()
result = check.execute()
# Search created VPC among default ones
for result in result:
if result.resource_id == vpc["VpcId"]:
assert result.status == "FAIL"
assert (
result.status_extended
== f"VPC {vpc['VpcId']} Flow logs are disabled."
)
assert result.resource_id == vpc["VpcId"]
@mock_ec2
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client",
new=mock_client,
):
# Test Check
from providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import (
vpc_flow_logs_enabled,
)
check = vpc_flow_logs_enabled()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,35 @@
{
"Provider": "aws",
"CheckID": "vpc_peering_routing_tables_with_least_privilege",
"CheckTitle": "Ensure routing tables for VPC peering are least access.",
"CheckType": ["Infrastructure Security"],
"ServiceName": "vpc",
"SubServiceName": "route_table",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "AwsEc2Vpc",
"Description": "Ensure routing tables for VPC peering are least access.",
"Risk": "Being highly selective in peering routing tables is a very effective way of minimizing the impact of breach as resources outside of these routes are inaccessible to the peered VPC.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "https://docs.bridgecrew.io/docs/networking_5#cli-command",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Review routing tables of peered VPCs for whether they route all subnets of each VPC and whether that is necessary to accomplish the intended purposes for peering the VPCs.",
"Url": "https://docs.aws.amazon.com/vpc/latest/peering/peering-configurations-partial-access.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": []
}

View File

@@ -0,0 +1,31 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.vpc.vpc_client import vpc_client
class vpc_peering_routing_tables_with_least_privilege(Check):
def execute(self):
findings = []
for peer in vpc_client.vpc_peering_connections:
report = Check_Report(self.metadata)
report.region = peer.region
comply = True
# Check each cidr in the peering route table
for route_table in peer.route_tables:
for cidr in route_table.destination_cidrs:
if (
cidr == "0.0.0.0/0"
or cidr == peer.requester_cidr
or cidr == peer.accepter_cidr
): # Check if cidr does not accept whole requester/accepter VPC CIDR
comply = False
if not comply:
report.status = "FAIL"
report.status_extended = f"VPC Peering Connection {peer.id} does not comply with least privilege access since it accepts whole VPCs CIDR in its route tables."
report.resource_id = peer.id
else:
report.status = "PASS"
report.status_extended = f"VPC Peering Connection {peer.id} comply with least privilege access."
report.resource_id = peer.id
findings.append(report)
return findings

View File

@@ -0,0 +1,172 @@
from unittest import mock
from boto3 import client, resource
from moto import mock_ec2
AWS_REGION = "us-east-1"
ACCOUNT_ID = "123456789012"
class Test_vpc_peering_routing_tables_with_least_privilege:
@mock_ec2
def test_vpc_no_peering_connections(self):
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
new=VPC(current_audit_info),
):
# Test Check
from providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
vpc_peering_routing_tables_with_least_privilege,
)
check = vpc_peering_routing_tables_with_least_privilege()
result = check.execute()
assert len(result) == 0
@mock_ec2
def test_vpc_comply_peering_connection_(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create VPCs peers as well as a comply route
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16")
vpc_pcx = ec2_client.create_vpc_peering_connection(
VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"]
)
vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"]
vpc_pcx = ec2_client.accept_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_id
)
main_route_table_id = ec2_client.describe_route_tables(
Filters=[
{"Name": "vpc-id", "Values": [vpc["Vpc"]["VpcId"]]},
{"Name": "association.main", "Values": ["true"]},
]
)["RouteTables"][0]["RouteTableId"]
main_route_table = ec2_resource.RouteTable(main_route_table_id)
main_route_table.create_route(
DestinationCidrBlock="10.0.0.4/24", VpcPeeringConnectionId=vpc_pcx_id
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC, Route
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
new=VPC(current_audit_info),
) as service_client:
# Test Check
from providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
vpc_peering_routing_tables_with_least_privilege,
)
service_client.vpc_peering_connections[0].route_tables = [
Route(
main_route_table_id,
["10.12.23.44/32"],
)
]
check = vpc_peering_routing_tables_with_least_privilege()
result = check.execute()
assert len(result) == len(
ec2_client.describe_vpc_peering_connections()["VpcPeeringConnections"]
)
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"VPC Peering Connection {vpc_pcx_id} comply with least privilege access."
)
assert result[0].resource_id == vpc_pcx_id
assert result[0].region == AWS_REGION
@mock_ec2
def test_vpc_not_comply_peering_connection_(self):
# Create VPC Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create VPCs peers as well as a comply route
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16")
vpc_pcx = ec2_client.create_vpc_peering_connection(
VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"]
)
vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"]
vpc_pcx = ec2_client.accept_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_id
)
main_route_table_id = ec2_client.describe_route_tables(
Filters=[
{"Name": "vpc-id", "Values": [vpc["Vpc"]["VpcId"]]},
{"Name": "association.main", "Values": ["true"]},
]
)["RouteTables"][0]["RouteTableId"]
main_route_table = ec2_resource.RouteTable(main_route_table_id)
main_route_table.create_route(
DestinationCidrBlock="10.0.0.0/16", VpcPeeringConnectionId=vpc_pcx_id
)
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.vpc.vpc_service import VPC, Route
current_audit_info.audited_partition = "aws"
with mock.patch(
"providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
new=VPC(current_audit_info),
) as service_client:
# Test Check
from providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
vpc_peering_routing_tables_with_least_privilege,
)
service_client.vpc_peering_connections[0].route_tables = [
Route(
main_route_table_id,
["10.0.0.0/16"],
)
]
check = vpc_peering_routing_tables_with_least_privilege()
result = check.execute()
assert len(result) == len(
ec2_client.describe_vpc_peering_connections()["VpcPeeringConnections"]
)
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"VPC Peering Connection {vpc_pcx_id} does not comply with least privilege access since it accepts whole VPCs CIDR in its route tables."
)
assert result[0].resource_id == vpc_pcx_id
assert result[0].region == AWS_REGION
@mock_ec2
def test_bad_response(self):
mock_client = mock.MagicMock()
with mock.patch(
"providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client",
new=mock_client,
):
# Test Check
from providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import (
vpc_peering_routing_tables_with_least_privilege,
)
check = vpc_peering_routing_tables_with_least_privilege()
result = check.execute()
assert len(result) == 0

View File

@@ -0,0 +1,306 @@
import json
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## VPC
class VPC:
def __init__(self, audit_info):
self.service = "ec2"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.vpcs = []
self.vpc_peering_connections = []
self.vpc_endpoints = []
self.vpc_endpoint_services = []
self.__threading_call__(self.__describe_vpcs__)
self.__threading_call__(self.__describe_vpc_peering_connections__)
self.__threading_call__(self.__describe_vpc_endpoints__)
self.__threading_call__(self.__describe_vpc_endpoint_services__)
self.__describe_flow_logs__()
self.__describe_route_tables__()
self.__describe_vpc_endpoint_service_permissions__()
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_vpcs__(self, regional_client):
logger.info("VPC - Describing VPCs...")
try:
describe_vpcs_paginator = regional_client.get_paginator("describe_vpcs")
for page in describe_vpcs_paginator.paginate():
for vpc in page["Vpcs"]:
self.vpcs.append(
VPCs(
vpc["VpcId"],
vpc["IsDefault"],
vpc["CidrBlock"],
regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_vpc_peering_connections__(self, regional_client):
logger.info("VPC - Describing VPC Peering Connections...")
try:
describe_vpc_peering_connections_paginator = regional_client.get_paginator(
"describe_vpc_peering_connections"
)
for page in describe_vpc_peering_connections_paginator.paginate():
for conn in page["VpcPeeringConnections"]:
self.vpc_peering_connections.append(
VpcPeeringConnection(
conn["VpcPeeringConnectionId"],
conn["AccepterVpcInfo"]["VpcId"],
conn["AccepterVpcInfo"]["CidrBlock"],
conn["RequesterVpcInfo"]["VpcId"],
conn["RequesterVpcInfo"]["CidrBlock"],
regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_route_tables__(self):
logger.info("VPC - Describing Peering Route Tables...")
try:
for conn in self.vpc_peering_connections:
regional_client = self.regional_clients[conn.region]
for route_table in regional_client.describe_route_tables(
Filters=[
{
"Name": "route.vpc-peering-connection-id",
"Values": [
conn.id,
],
},
]
)["RouteTables"]:
destination_cidrs = []
for route in route_table["Routes"]:
if (
route["Origin"] != "CreateRouteTable"
): # avoid default route table
destination_cidrs.append(route["DestinationCidrBlock"])
conn.route_tables.append(
Route(
route_table["RouteTableId"],
destination_cidrs,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_flow_logs__(self):
logger.info("VPC - Describing flow logs...")
try:
for vpc in self.vpcs:
regional_client = self.regional_clients[vpc.region]
flow_logs = regional_client.describe_flow_logs(
Filters=[
{
"Name": "resource-id",
"Values": [
vpc.id,
],
},
]
)["FlowLogs"]
if flow_logs:
vpc.flow_log = True
except Exception as error:
logger.error(f"{error.__class__.__name__}: {error}")
def __describe_vpc_endpoints__(self, regional_client):
logger.info("VPC - Describing VPC Endpoints...")
try:
describe_vpc_endpoints_paginator = regional_client.get_paginator(
"describe_vpc_endpoints"
)
for page in describe_vpc_endpoints_paginator.paginate():
for endpoint in page["VpcEndpoints"]:
self.vpc_endpoints.append(
VpcEndpoint(
endpoint["VpcEndpointId"],
endpoint["VpcId"],
endpoint["State"],
json.loads(endpoint["PolicyDocument"]),
endpoint["OwnerId"],
regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_vpc_endpoint_services__(self, regional_client):
logger.info("VPC - Describing VPC Endpoint Services...")
try:
describe_vpc_endpoint_services_paginator = regional_client.get_paginator(
"describe_vpc_endpoint_services"
)
for page in describe_vpc_endpoint_services_paginator.paginate():
for endpoint in page["ServiceDetails"]:
if endpoint["Owner"] != "amazon":
self.vpc_endpoint_services.append(
VpcEndpointService(
endpoint["ServiceId"],
endpoint["ServiceName"],
endpoint["Owner"],
regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __describe_vpc_endpoint_service_permissions__(self):
logger.info("VPC - Describing VPC Endpoint service permissions...")
try:
for service in self.vpc_endpoint_services:
regional_client = self.regional_clients[service.region]
for (
principal
) in regional_client.describe_vpc_endpoint_service_permissions(
ServiceId=service.id
)[
"AllowedPrincipals"
]:
service.allowed_principals.append(principal["Principal"])
except Exception as error:
logger.error(f"{error.__class__.__name__}: {error}")
@dataclass
class VPCs:
id: str
default: bool
cidr_block: str
flow_log: bool
region: str
def __init__(
self,
id,
default,
cidr_block,
region,
):
self.id = id
self.default = default
self.cidr_block = cidr_block
self.flow_log = False
self.region = region
@dataclass
class Route:
id: str
destination_cidrs: list[str]
def __init__(
self,
id,
destination_cidrs,
):
self.id = id
self.destination_cidrs = destination_cidrs
@dataclass
class VpcPeeringConnection:
id: str
accepter_vpc: str
accepter_cidr: str
requester_vpc: str
requester_cidr: str
route_tables: list[Route]
region: str
def __init__(
self,
id,
accepter_vpc,
accepter_cidr,
requester_vpc,
requester_cidr,
region,
):
self.id = id
self.accepter_vpc = accepter_vpc
self.accepter_cidr = accepter_cidr
self.requester_vpc = requester_vpc
self.requester_cidr = requester_cidr
self.route_tables = []
self.region = region
@dataclass
class VpcEndpoint:
id: str
vpc_id: str
state: str
policy_document: dict
owner_id: list[Route]
region: str
def __init__(
self,
id,
vpc_id,
state,
policy_document,
owner_id,
region,
):
self.id = id
self.vpc_id = vpc_id
self.state = state
self.policy_document = policy_document
self.owner_id = owner_id
self.route_tables = []
self.region = region
@dataclass
class VpcEndpointService:
id: str
service: str
owner_id: str
allowed_principals: list
region: str
def __init__(
self,
id,
service,
owner_id,
region,
):
self.id = id
self.service = service
self.owner_id = owner_id
self.allowed_principals = []
self.region = region

View File

@@ -0,0 +1,258 @@
import json
from boto3 import client, resource, session
from moto import mock_ec2, mock_elbv2
from providers.aws.lib.audit_info.models import AWS_Audit_Info
from providers.aws.services.vpc.vpc_service import VPC, Route
AWS_ACCOUNT_NUMBER = 123456789012
AWS_REGION = "us-east-1"
class Test_VPC_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
)
return audit_info
# Test VPC Service
@mock_ec2
def test_service(self):
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert vpc.service == "ec2"
# Test VPC Client
@mock_ec2
def test_client(self):
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
for client in vpc.regional_clients.values():
assert client.__class__.__name__ == "EC2"
# Test VPC Session
@mock_ec2
def test__get_session__(self):
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert vpc.session.__class__.__name__ == "Session"
# Test VPC Session
@mock_ec2
def test_audited_account(self):
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert vpc.audited_account == AWS_ACCOUNT_NUMBER
# Test VPC Describe VPCs
@mock_ec2
def test__describe_vpcs__(self):
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION)
# Create VPC
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert (
len(vpc.vpcs) == 24
) # Number of AWS regions + created VPC, one default VPC per region
# Test VPC Describe Flow Logs
@mock_ec2
def test__describe_flow_logs__(self):
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION)
new_vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
# Create VPC Flow log
ec2_client.create_flow_logs(
ResourceType="VPC",
ResourceIds=[new_vpc["VpcId"]],
TrafficType="ALL",
LogDestinationType="cloud-watch-logs",
LogGroupName="test_logs",
DeliverLogsPermissionArn="arn:aws:iam::"
+ str(AWS_ACCOUNT_NUMBER)
+ ":role/test-role",
)
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
# Search created VPC among default ones
for vpc in vpc.vpcs:
if vpc.id == new_vpc["VpcId"]:
assert vpc.flow_log == True
# Test VPC Describe VPC Peering connections
@mock_ec2
def test__describe_vpc_peering_connections__(self):
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION)
# Create VPCs peers
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16")
vpc_pcx = ec2_client.create_vpc_peering_connection(
VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"]
)
vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"]
vpc_pcx = ec2_client.accept_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_id
)
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert len(vpc.vpc_peering_connections) == 1
assert vpc.vpc_peering_connections[0].id == vpc_pcx_id
# Test VPC Describe VPC Peering connections
@mock_ec2
def test__describe_vpc_peering_connections__(self):
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION)
# Create VPCs peers
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16")
vpc_pcx = ec2_client.create_vpc_peering_connection(
VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"]
)
vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"]
vpc_pcx = ec2_client.accept_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_id
)
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert len(vpc.vpc_peering_connections) == 1
assert vpc.vpc_peering_connections[0].id == vpc_pcx_id
# Test VPC Describe VPC Peering connections
@mock_ec2
def test__describe_route_tables__(self):
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_resource = resource("ec2", region_name=AWS_REGION)
# Create VPCs peers as well as a route
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16")
vpc_pcx = ec2_client.create_vpc_peering_connection(
VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"]
)
vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"]
vpc_pcx = ec2_client.accept_vpc_peering_connection(
VpcPeeringConnectionId=vpc_pcx_id
)
main_route_table_id = ec2_client.describe_route_tables(
Filters=[
{"Name": "vpc-id", "Values": [vpc["Vpc"]["VpcId"]]},
{"Name": "association.main", "Values": ["true"]},
]
)["RouteTables"][0]["RouteTableId"]
# FilterNotImplementedError: The filter 'route.vpc-peering-connection-id' for DescribeRouteTables has not been implemented in Moto yet.
# main_route_table = ec2_resource.RouteTable(main_route_table_id)
# main_route_table.create_route(
# DestinationCidrBlock="10.0.0.4/24", VpcPeeringConnectionId=vpc_pcx_id
# )
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
vpc.vpc_peering_connections[0].route_tables = [
Route(
main_route_table_id,
["10.0.0.4/24"],
)
]
assert len(vpc.vpc_peering_connections[0].route_tables) == 1
assert vpc.vpc_peering_connections[0].id == vpc_pcx_id
# Test VPC Describe VPC Endpoints
@mock_ec2
def test__describe_vpc_endpoints__(self):
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION)
# Create VPC endpoint
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"]
endpoint = ec2_client.create_vpc_endpoint(
VpcId=vpc["VpcId"],
ServiceName="com.amazonaws.us-east-1.s3",
RouteTableIds=[route_table["RouteTableId"]],
VpcEndpointType="Gateway",
PolicyDocument=json.dumps(
{
"Statement": [
{
"Action": "*",
"Effect": "Allow",
"Principal": "*",
"Resource": "*",
}
]
}
),
)["VpcEndpoint"]["VpcEndpointId"]
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert len(vpc.vpc_endpoints) == 1
assert vpc.vpc_endpoints[0].id == endpoint
# Test VPC Describe VPC Endpoint Services
@mock_ec2
@mock_elbv2
def test__describe_vpc_endpoint_services__(self):
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION)
elbv2_client = client("elbv2", region_name=AWS_REGION)
vpc = ec2_client.create_vpc(
CidrBlock="172.28.7.0/24", InstanceTenancy="default"
)
subnet = ec2_client.create_subnet(
VpcId=vpc["Vpc"]["VpcId"],
CidrBlock="172.28.7.192/26",
AvailabilityZone=f"{AWS_REGION}a",
)
lb_name = "lb_vpce-test"
lb_arn = elbv2_client.create_load_balancer(
Name=lb_name,
Subnets=[subnet["Subnet"]["SubnetId"]],
Scheme="internal",
Type="network",
)["LoadBalancers"][0]["LoadBalancerArn"]
service = ec2_client.create_vpc_endpoint_service_configuration(
NetworkLoadBalancerArns=[lb_arn]
)
# VPC client for this test class
audit_info = self.set_mocked_audit_info()
vpc = VPC(audit_info)
assert (
len(vpc.vpc_endpoint_services) == 0
) # Wait until this issue is fixed https://github.com/spulec/moto/issues/5605