From b2976984d3a8f5baf5eba1648f9b89e1089129f6 Mon Sep 17 00:00:00 2001 From: Sergio Garcia <38561120+sergargar@users.noreply.github.com> Date: Fri, 28 Oct 2022 12:15:15 +0200 Subject: [PATCH] feat(vpc): add service, checks and tests (#1432) --- Pipfile | 2 +- config/config.py | 25 +- providers/aws/config.yaml | 5 + .../aws/services/{vpc => cloudwatch}/check311 | 0 .../aws/services/{vpc => cloudwatch}/check312 | 0 .../aws/services/{vpc => cloudwatch}/check313 | 0 .../aws/services/{vpc => cloudwatch}/check314 | 0 .../aws/services/{s3 => cloudwatch}/check38 | 0 providers/aws/services/vpc/__init__.py | 0 providers/aws/services/vpc/check29 | 52 --- providers/aws/services/vpc/check44 | 48 --- providers/aws/services/vpc/check_extra789 | 67 ---- providers/aws/services/vpc/check_extra790 | 70 ---- providers/aws/services/vpc/vpc_client.py | 4 + .../__init__.py | 0 ...connections_trust_boundaries.metadata.json | 35 ++ ...c_endpoint_connections_trust_boundaries.py | 45 +++ ...point_connections_trust_boundaries_test.py | 272 ++++++++++++++++ .../__init__.py | 0 ..._principals_trust_boundaries.metadata.json | 35 ++ ...ces_allowed_principals_trust_boundaries.py | 39 +++ ...llowed_principals_trust_boundaries_test.py | 128 ++++++++ .../vpc/vpc_flow_logs_enabled/__init__.py | 0 .../vpc_flow_logs_enabled.metadata.json | 35 ++ .../vpc_flow_logs_enabled.py | 21 ++ .../vpc_flow_logs_enabled_test.py | 127 ++++++++ .../__init__.py | 0 ..._tables_with_least_privilege.metadata.json | 35 ++ ...ing_routing_tables_with_least_privilege.py | 31 ++ ...outing_tables_with_least_privilege_test.py | 172 ++++++++++ providers/aws/services/vpc/vpc_service.py | 306 ++++++++++++++++++ .../aws/services/vpc/vpc_service_test.py | 258 +++++++++++++++ 32 files changed, 1566 insertions(+), 246 deletions(-) rename providers/aws/services/{vpc => cloudwatch}/check311 (100%) rename providers/aws/services/{vpc => cloudwatch}/check312 (100%) rename providers/aws/services/{vpc => cloudwatch}/check313 (100%) rename providers/aws/services/{vpc => cloudwatch}/check314 (100%) rename providers/aws/services/{s3 => cloudwatch}/check38 (100%) create mode 100644 providers/aws/services/vpc/__init__.py delete mode 100644 providers/aws/services/vpc/check29 delete mode 100644 providers/aws/services/vpc/check44 delete mode 100644 providers/aws/services/vpc/check_extra789 delete mode 100644 providers/aws/services/vpc/check_extra790 create mode 100644 providers/aws/services/vpc/vpc_client.py create mode 100644 providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/__init__.py create mode 100644 providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.metadata.json create mode 100644 providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py create mode 100644 providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py create mode 100644 providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/__init__.py create mode 100644 providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.metadata.json create mode 100644 providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.py create mode 100644 providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py create mode 100644 providers/aws/services/vpc/vpc_flow_logs_enabled/__init__.py create mode 100644 providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled.metadata.json create mode 100644 providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled.py create mode 100644 providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled_test.py create mode 100644 providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/__init__.py create mode 100644 providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege.metadata.json create mode 100644 providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege.py create mode 100644 providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege_test.py create mode 100644 providers/aws/services/vpc/vpc_service.py create mode 100644 providers/aws/services/vpc/vpc_service_test.py diff --git a/Pipfile b/Pipfile index 1d00e1b2..c7c461ab 100644 --- a/Pipfile +++ b/Pipfile @@ -9,7 +9,7 @@ boto3 = "1.24.8" arnparse = "0.0.2" botocore = "1.27.8" pydantic = "1.9.1" -moto = {extras = ["iam"], version = "3.1.14"} +moto = "4.0.8" sure = "2.0.0" bandit = "1.7.4" safety = "1.10.3" diff --git a/config/config.py b/config/config.py index 0ad5db2c..68c2516e 100644 --- a/config/config.py +++ b/config/config.py @@ -3,6 +3,8 @@ from os import getcwd import yaml +from lib.logger import logger + timestamp = datetime.today() timestamp_utc = datetime.now(timezone.utc).replace(tzinfo=timezone.utc) prowler_version = "3.0-beta-08Aug2022" @@ -24,17 +26,24 @@ config_yaml = "providers/aws/config.yaml" def change_config_var(variable, value): - with open(config_yaml) as f: - doc = yaml.safe_load(f) + try: + with open(config_yaml) as f: + doc = yaml.safe_load(f) - doc[variable] = value + doc[variable] = value - with open(config_yaml, "w") as f: - yaml.dump(doc, f) + with open(config_yaml, "w") as f: + yaml.dump(doc, f) + except Exception as error: + logger.error(f"{error.__class__.__name__}: {error}") def get_config_var(variable): - with open(config_yaml) as f: - doc = yaml.safe_load(f) + try: + with open(config_yaml) as f: + doc = yaml.safe_load(f) - return doc[variable] + return doc[variable] + except Exception as error: + logger.error(f"{error.__class__.__name__}: {error}") + return "" diff --git a/providers/aws/config.yaml b/providers/aws/config.yaml index 3281beb8..198e812b 100644 --- a/providers/aws/config.yaml +++ b/providers/aws/config.yaml @@ -1 +1,6 @@ shodan_api_key: null + +# Single account environment: No action required. The AWS account number will be automatically added by the checks. +# Multi account environment: Any additional trusted account number should be added as a space separated list, e.g. +# trusted_account_ids : ["123456789012", "098765432109", "678901234567"] +trusted_account_ids : [] diff --git a/providers/aws/services/vpc/check311 b/providers/aws/services/cloudwatch/check311 similarity index 100% rename from providers/aws/services/vpc/check311 rename to providers/aws/services/cloudwatch/check311 diff --git a/providers/aws/services/vpc/check312 b/providers/aws/services/cloudwatch/check312 similarity index 100% rename from providers/aws/services/vpc/check312 rename to providers/aws/services/cloudwatch/check312 diff --git a/providers/aws/services/vpc/check313 b/providers/aws/services/cloudwatch/check313 similarity index 100% rename from providers/aws/services/vpc/check313 rename to providers/aws/services/cloudwatch/check313 diff --git a/providers/aws/services/vpc/check314 b/providers/aws/services/cloudwatch/check314 similarity index 100% rename from providers/aws/services/vpc/check314 rename to providers/aws/services/cloudwatch/check314 diff --git a/providers/aws/services/s3/check38 b/providers/aws/services/cloudwatch/check38 similarity index 100% rename from providers/aws/services/s3/check38 rename to providers/aws/services/cloudwatch/check38 diff --git a/providers/aws/services/vpc/__init__.py b/providers/aws/services/vpc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/vpc/check29 b/providers/aws/services/vpc/check29 deleted file mode 100644 index 035fc345..00000000 --- a/providers/aws/services/vpc/check29 +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env bash - -# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -CHECK_ID_check29="2.9" -CHECK_TITLE_check29="[check29] Ensure VPC Flow Logging is Enabled in all VPCs" -CHECK_SCORED_check29="SCORED" -CHECK_CIS_LEVEL_check29="LEVEL2" -CHECK_SEVERITY_check29="Medium" -CHECK_ASFF_TYPE_check29="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark" -CHECK_ASFF_RESOURCE_TYPE_check29="AwsEc2Vpc" -CHECK_ALTERNATE_check209="check29" -CHECK_ASFF_COMPLIANCE_TYPE_check29="ens-op.mon.1.aws.flow.1" -CHECK_SERVICENAME_check29="vpc" -CHECK_RISK_check29='VPC Flow Logs provide visibility into network traffic that traverses the VPC and can be used to detect anomalous traffic or insight during security workflows.' -CHECK_REMEDIATION_check29='It is recommended that VPC Flow Logs be enabled for packet "Rejects" for VPCs. ' -CHECK_DOC_check29='http://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs.html ' -CHECK_CAF_EPIC_check29='Logging and Monitoring' - -check29(){ - # "Ensure VPC Flow Logging is Enabled in all VPCs (Scored)" - for regx in $REGIONS; do - AVAILABLE_VPC=$($AWSCLI ec2 describe-vpcs $PROFILE_OPT --region $regx --query 'Vpcs[?State==`available`].VpcId' --output text 2>&1) - if [[ $(echo "$AVAILABLE_VPC" | grep -E 'AccessDenied|UnauthorizedOperation') ]]; then - textInfo "$regx: Access Denied trying to describe VPCs" "$regx" "$vpcx" - continue - fi - for vpcx in $AVAILABLE_VPC; do - CHECK_FL=$($AWSCLI ec2 describe-flow-logs $PROFILE_OPT --region $regx --filter Name="resource-id",Values="${vpcx}" --query 'FlowLogs[?FlowLogStatus==`ACTIVE`].FlowLogId' --output text 2>&1) - if [[ $(echo "$CHECK_FL" | grep -E 'AccessDenied|UnauthorizedOperation') ]]; then - textInfo "$regx: Access Denied trying to describe flow logs in VPC $vpcx" "$regx" "$vpcx" - continue - fi - if [[ $CHECK_FL ]]; then - for FL in $CHECK_FL; do - textPass "$regx: VPC $vpcx VPCFlowLog is enabled for LogGroupName: $FL" "$regx" "$vpcx" - done - else - textFail "$regx: VPC $vpcx VPCFlowLog is disabled" "$regx" "$vpcx" - fi - done - done -} diff --git a/providers/aws/services/vpc/check44 b/providers/aws/services/vpc/check44 deleted file mode 100644 index 158e5c32..00000000 --- a/providers/aws/services/vpc/check44 +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bash - -# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -CHECK_ID_check44="4.4" -CHECK_TITLE_check44="[check44] Ensure routing tables for VPC peering are \"least access\"" -CHECK_SCORED_check44="NOT_SCORED" -CHECK_CIS_LEVEL_check44="LEVEL2" -CHECK_SEVERITY_check44="Medium" -CHECK_ASFF_TYPE_check44="Software and Configuration Checks/Industry and Regulatory Standards/CIS AWS Foundations Benchmark" -CHECK_ASFF_RESOURCE_TYPE_check44="AwsEc2Vpc" -CHECK_ALTERNATE_check404="check44" -CHECK_SERVICENAME_check44="vpc" -CHECK_RISK_check44='Being highly selective in peering routing tables is a very effective way of minimizing the impact of breach as resources outside of these routes are inaccessible to the peered VPC.' -CHECK_REMEDIATION_check44='Review routing tables of peered VPCs for whether they route all subnets of each VPC and whether that is necessary to accomplish the intended purposes for peering the VPCs.' -CHECK_DOC_check44='https://docs.aws.amazon.com/vpc/latest/peering/peering-configurations-partial-access.html' -CHECK_CAF_EPIC_check44='Infrastructure Security' - -check44(){ - # "Ensure routing tables for VPC peering are \"least access\" (Not Scored)" - for regx in $REGIONS; do - LIST_OF_VPCS_PEERING_CONNECTIONS=$($AWSCLI ec2 describe-vpc-peering-connections --output text $PROFILE_OPT --region $regx --query 'VpcPeeringConnections[*].VpcPeeringConnectionId' 2>&1| sort | paste -s -d" " - ) - if [[ $(echo "$LIST_OF_VPCS_PEERING_CONNECTIONS" | grep -E 'AccessDenied|UnauthorizedOperation') ]]; then - textInfo "$regx: Access Denied trying to describe vpc peering connections" "$regx" - continue - fi - if [[ $LIST_OF_VPCS_PEERING_CONNECTIONS ]];then - textInfo "$regx: $LIST_OF_VPCS_PEERING_CONNECTIONS - review routing tables" "$regx" "$LIST_OF_VPCS_PEERING_CONNECTIONS" - #LIST_OF_VPCS=$($AWSCLI ec2 describe-vpcs $PROFILE_OPT --region $regx --query 'Vpcs[*].VpcId' --output text) - #aws ec2 describe-route-tables --filter "Name=vpc-id,Values=vpc-0213e864" --query "RouteTables[*].{RouteTableId:RouteTableId, VpcId:VpcId, Routes:Routes, AssociatedSubnets:Associations[*].SubnetId}" $PROFILE_OPT --region $regx - # for vpc in $LIST_OF_VPCS; do - # VPCS_WITH_PEERING=$($AWSCLI ec2 describe-route-tables --filter "Name=vpc-id,Values=$vpc" $PROFILE_OPT --region $regx --query "RouteTables[*].{RouteTableId:RouteTableId, VpcId:VpcId, Routes:Routes, AssociatedSubnets:Associations[*].SubnetId}" |grep GatewayId|grep pcx-) - # done - #echo $VPCS_WITH_PEERING - else - textPass "$regx: No VPC peering found" "$regx" "$LIST_OF_VPCS_PEERING_CONNECTIONS" - fi - done -} diff --git a/providers/aws/services/vpc/check_extra789 b/providers/aws/services/vpc/check_extra789 deleted file mode 100644 index 1006a723..00000000 --- a/providers/aws/services/vpc/check_extra789 +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env bash - -# Prowler - the handy cloud security tool (copyright 2020) by Toni de la Fuente -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -CHECK_ID_extra789="7.89" -CHECK_TITLE_extra789="[extra789] Find trust boundaries in VPC endpoint services connections" -CHECK_SCORED_extra789="NOT_SCORED" -CHECK_CIS_LEVEL_extra789="EXTRA" -CHECK_SEVERITY_extra789="Medium" -CHECK_ASFF_RESOURCE_TYPE_extra789="AwsEc2Vpc" -CHECK_ALTERNATE_extra789="extra789" -CHECK_SERVICENAME_extra789="vpc" -CHECK_RISK_extra789='Account VPC could be linked to other accounts.' -CHECK_REMEDIATION_extra789='In multi Account environments identify untrusted links. Check trust chaining and dependencies between accounts.' -CHECK_DOC_extra789='https://github.com/toniblyx/prowler/#trust-boundaries-checks' -CHECK_CAF_EPIC_extra789='Infrastructure Security' - -extra789(){ - TRUSTED_ACCOUNT_IDS=$( echo "${ACCOUNT_NUM} ${GROUP_TRUSTBOUNDARIES_TRUSTED_ACCOUNT_IDS}" | xargs ) - - for regx in ${REGIONS}; do - ENDPOINT_SERVICES_IDS=$(${AWSCLI} ec2 describe-vpc-endpoint-services \ - ${PROFILE_OPT} \ - --query "ServiceDetails[?Owner=='${ACCOUNT_NUM}'].ServiceId" \ - --region ${regx} \ - --output text | xargs 2>&1) - if [[ $(echo "$ENDPOINT_SERVICES_IDS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then - textInfo "$regx: Access Denied trying to describe VPC endpoint services" "$regx" - continue - fi - - for ENDPOINT_SERVICE_ID in ${ENDPOINT_SERVICES_IDS}; do - - ENDPOINT_CONNECTION_LIST=$(${AWSCLI} ec2 describe-vpc-endpoint-connections \ - ${PROFILE_OPT} \ - --query "VpcEndpointConnections[?VpcEndpointState=='available'].VpcEndpointOwner" \ - --region ${regx} \ - --output text | xargs - ) - - for ENDPOINT_CONNECTION in ${ENDPOINT_CONNECTION_LIST}; do - for ACCOUNT_ID in ${TRUSTED_ACCOUNT_IDS}; do - if [[ "${ACCOUNT_ID}" == "${ENDPOINT_CONNECTION}" ]]; then - textPass "${regx}: Found trusted account in VPC endpoint service connection ${ENDPOINT_CONNECTION}" "${regx}" "${ENDPOINT_CONNECTION}" - # Algorithm: - # Remove all trusted ACCOUNT_IDs from ENDPOINT_CONNECTION_LIST. - # As a result, the ENDPOINT_CONNECTION_LIST finally contains only unknown/untrusted account ids. - ENDPOINT_CONNECTION_LIST=("${ENDPOINT_CONNECTION_LIST[@]/$ENDPOINT_CONNECTION}") # remove hit from allowlist - fi - done - done - - for UNTRUSTED_CONNECTION in ${ENDPOINT_CONNECTION_LIST}; do - textFail "${regx}: Found untrusted account in VPC endpoint service connection ${UNTRUSTED_CONNECTION}" "${regx}" "${ENDPOINT_CONNECTION}" - done - done - done -} diff --git a/providers/aws/services/vpc/check_extra790 b/providers/aws/services/vpc/check_extra790 deleted file mode 100644 index 79f8bfee..00000000 --- a/providers/aws/services/vpc/check_extra790 +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env bash - -# Prowler - the handy cloud security tool (copyright 2020) by Toni de la Fuente -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy -# of the License at http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. - -CHECK_ID_extra790="7.90" -CHECK_TITLE_extra790="[extra790] Find trust boundaries in VPC endpoint services allowlisted principles" -CHECK_SCORED_extra790="NOT_SCORED" -CHECK_CIS_LEVEL_extra790="EXTRA" -CHECK_SEVERITY_extra790="Medium" -CHECK_ASFF_RESOURCE_TYPE_extra790="AwsEc2Vpc" -CHECK_ALTERNATE_extra790="extra790" -CHECK_SERVICENAME_extra790="vpc" -CHECK_RISK_extra790='Account VPC could be linked to other accounts.' -CHECK_REMEDIATION_extra790='In multi Account environments identify untrusted links. Check trust chaining and dependencies between accounts.' -CHECK_DOC_extra790='https://github.com/toniblyx/prowler/#trust-boundaries-checks' -CHECK_CAF_EPIC_extra790='Infrastructure Security' - -extra790(){ - TRUSTED_ACCOUNT_IDS=$( echo "${ACCOUNT_NUM} ${GROUP_TRUSTBOUNDARIES_TRUSTED_ACCOUNT_IDS}" | xargs ) - - for regx in ${REGIONS}; do - ENDPOINT_SERVICES_IDS=$(${AWSCLI} ec2 describe-vpc-endpoint-services \ - ${PROFILE_OPT} \ - --query "ServiceDetails[?Owner=='${ACCOUNT_NUM}'].ServiceId" \ - --region ${regx} \ - --output text | xargs 2>&1) - if [[ $(echo "$ENDPOINT_SERVICES_IDS" | grep -E 'AccessDenied|UnauthorizedOperation|AuthorizationError') ]]; then - textInfo "$regx: Access Denied trying to describe VPC endpoint services" "$regx" - continue - fi - - for ENDPOINT_SERVICE_ID in ${ENDPOINT_SERVICES_IDS}; do - ENDPOINT_PERMISSIONS_LIST=$(${AWSCLI} ec2 describe-vpc-endpoint-service-permissions \ - ${PROFILE_OPT} \ - --service-id ${ENDPOINT_SERVICE_ID} \ - --query "AllowedPrincipals[*].Principal" \ - --region ${regx} \ - --output text | xargs - ) - - for ENDPOINT_PERMISSION in ${ENDPOINT_PERMISSIONS_LIST}; do - # Take only account id from ENDPOINT_PERMISSION: arn:aws:iam::965406151242:root - ENDPOINT_PERMISSION_ACCOUNT_ID=$(echo ${ENDPOINT_PERMISSION} | cut -d':' -f5 | xargs) - - for ACCOUNT_ID in ${TRUSTED_ACCOUNT_IDS}; do - if [[ "${ACCOUNT_ID}" == "${ENDPOINT_PERMISSION_ACCOUNT_ID}" ]]; then - textPass "${regx}: Found trusted account in VPC endpoint service permission ${ENDPOINT_PERMISSION}" "${regx}" - # Algorithm: - # Remove all trusted ACCOUNT_IDs from ENDPOINT_PERMISSIONS_LIST. - # As a result, the ENDPOINT_PERMISSIONS_LIST finally contains only unknown/untrusted account ids. - ENDPOINT_PERMISSIONS_LIST=("${ENDPOINT_PERMISSIONS_LIST[@]/$ENDPOINT_PERMISSION}") - fi - done - done - - for UNTRUSTED_PERMISSION in ${ENDPOINT_PERMISSIONS_LIST}; do - textFail "${regx}: Found untrusted account in VPC endpoint service permission ${UNTRUSTED_PERMISSION}" "${regx}" "${UNTRUSTED_PERMISSION}" - done - done - done -} diff --git a/providers/aws/services/vpc/vpc_client.py b/providers/aws/services/vpc/vpc_client.py new file mode 100644 index 00000000..124abcc9 --- /dev/null +++ b/providers/aws/services/vpc/vpc_client.py @@ -0,0 +1,4 @@ +from providers.aws.lib.audit_info.audit_info import current_audit_info +from providers.aws.services.vpc.vpc_service import VPC + +vpc_client = VPC(current_audit_info) diff --git a/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/__init__.py b/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.metadata.json b/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.metadata.json new file mode 100644 index 00000000..16ac8d2d --- /dev/null +++ b/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "vpc_endpoint_connections_trust_boundaries", + "CheckTitle": "Find trust boundaries in VPC endpoint connections.", + "CheckType": ["Infrastructure Security"], + "ServiceName": "vpc", + "SubServiceName": "endpoint", + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "Severity": "medium", + "ResourceType": "AwsEc2Vpc", + "Description": "Find trust boundaries in VPC endpoint connections.", + "Risk": "Account VPC could be linked to other accounts.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "https://docs.bridgecrew.io/docs/networking_9#aws-vpc-endpoints-are-exposed", + "Terraform": "" + }, + "Recommendation": { + "Text": "In multi Account environments identify untrusted links. Check trust chaining and dependencies between accounts.", + "Url": "https://docs.aws.amazon.com/vpc/latest/privatelink/vpc-endpoints-access.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py b/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py new file mode 100644 index 00000000..2b990908 --- /dev/null +++ b/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries.py @@ -0,0 +1,45 @@ +from config.config import get_config_var +from lib.check.models import Check, Check_Report +from providers.aws.services.vpc.vpc_client import vpc_client + + +class vpc_endpoint_connections_trust_boundaries(Check): + def execute(self): + findings = [] + # Get trusted account_ids from config.yaml + trusted_account_ids = get_config_var("trusted_account_ids") + for endpoint in vpc_client.vpc_endpoints: + # Check VPC endpoint policy + for statement in endpoint.policy_document["Statement"]: + if "*" == statement["Principal"]: + report = Check_Report(self.metadata) + report.region = endpoint.region + report.status = "FAIL" + report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has full access." + report.resource_id = endpoint.id + findings.append(report) + break + + else: + if type(statement["Principal"]["AWS"]) == str: + principals = [statement["Principal"]["AWS"]] + else: + principals = statement["Principal"]["AWS"] + for principal_arn in principals: + account_id = principal_arn.split(":")[4] + report = Check_Report(self.metadata) + report.region = endpoint.region + if ( + account_id in trusted_account_ids + or account_id in vpc_client.audited_account + ): + report.status = "PASS" + report.status_extended = f"Found trusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}." + report.resource_id = endpoint.id + else: + report.status = "FAIL" + report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id}." + report.resource_id = endpoint.id + findings.append(report) + + return findings diff --git a/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py b/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py new file mode 100644 index 00000000..886303f4 --- /dev/null +++ b/providers/aws/services/vpc/vpc_endpoint_connections_trust_boundaries/vpc_endpoint_connections_trust_boundaries_test.py @@ -0,0 +1,272 @@ +import json +from unittest import mock + +from boto3 import client +from moto import mock_ec2 + +AWS_REGION = "us-east-1" +ACCOUNT_ID = "123456789012" + + +def mock_get_config_var(config_var): + if config_var == "trusted_account_ids": + return ["123456789010"] + return [] + + +class Test_vpc_endpoint_connections_trust_boundaries: + @mock_ec2 + def test_vpc_no_endpoints(self): + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 0 + + @mock_ec2 + def test_vpc_endpoint_with_full_access(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + vpc_endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Action": "*", + "Effect": "Allow", + "Principal": "*", + "Resource": "*", + } + ] + } + ), + ) + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']} has full access." + ) + assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + assert result[0].region == AWS_REGION + + @mock_ec2 + def test_vpc_endpoint_with_trusted_account(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + vpc_endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "arn:aws:iam::123456789012:root"}, + "Action": "*", + "Resource": "*", + } + ] + } + ), + ) + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + current_audit_info.audited_account = ACCOUNT_ID + + with mock.patch( + "providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Found trusted account {ACCOUNT_ID} in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." + ) + assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + assert result[0].region == AWS_REGION + + @mock_ec2 + def test_vpc_endpoint_with_untrusted_account(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + vpc_endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "arn:aws:iam::123456789010:root"}, + "Action": "*", + "Resource": "*", + } + ] + } + ), + ) + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + current_audit_info.audited_account = ACCOUNT_ID + + with mock.patch( + "providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Found untrusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." + ) + assert result[0].resource_id == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + + @mock_ec2 + def test_vpc_endpoint_with_config_trusted_account(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + vpc_endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "arn:aws:iam::123456789010:root"}, + "Action": "*", + "Resource": "*", + } + ] + } + ), + ) + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + current_audit_info.audited_account = ACCOUNT_ID + + with mock.patch( + "providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + with mock.patch( + "providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.get_config_var", + new=mock_get_config_var, + ): + # Test Check + from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Found trusted account 123456789010 in VPC Endpoint {vpc_endpoint['VpcEndpoint']['VpcEndpointId']} in VPC {vpc['VpcId']}." + ) + assert ( + result[0].resource_id + == vpc_endpoint["VpcEndpoint"]["VpcEndpointId"] + ) + assert result[0].region == AWS_REGION + + @mock_ec2 + def test_bad_response(self): + mock_client = mock.MagicMock() + + with mock.patch( + "providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries.vpc_client", + new=mock_client, + ): + # Test Check + from providers.aws.services.vpc.vpc_endpoint_connections_trust_boundaries.vpc_endpoint_connections_trust_boundaries import ( + vpc_endpoint_connections_trust_boundaries, + ) + + check = vpc_endpoint_connections_trust_boundaries() + result = check.execute() + + assert len(result) == 0 diff --git a/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/__init__.py b/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.metadata.json b/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.metadata.json new file mode 100644 index 00000000..d8886635 --- /dev/null +++ b/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "vpc_endpoint_services_allowed_principals_trust_boundaries", + "CheckTitle": "Find trust boundaries in VPC endpoint services allowlisted principles.", + "CheckType": ["Infrastructure Security"], + "ServiceName": "vpc", + "SubServiceName": "service_endpoint", + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "Severity": "medium", + "ResourceType": "AwsEc2Vpc", + "Description": "Find trust boundaries in VPC endpoint services allowlisted principles.", + "Risk": "Account VPC could be linked to other accounts.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "https://docs.bridgecrew.io/docs/networking_9#aws-vpc-endpoints-are-exposed", + "Terraform": "" + }, + "Recommendation": { + "Text": "In multi Account environments identify untrusted links. Check trust chaining and dependencies between accounts.", + "Url": "https://docs.aws.amazon.com/vpc/latest/privatelink/vpc-endpoints-access.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.py b/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.py new file mode 100644 index 00000000..66d5c7d8 --- /dev/null +++ b/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries.py @@ -0,0 +1,39 @@ +from config.config import get_config_var +from lib.check.models import Check, Check_Report +from providers.aws.services.vpc.vpc_client import vpc_client + + +class vpc_endpoint_services_allowed_principals_trust_boundaries(Check): + def execute(self): + findings = [] + # Get trusted account_ids from config.yaml + trusted_account_ids = get_config_var("trusted_account_ids") + for service in vpc_client.vpc_endpoint_services: + if not service.allowed_principals: + report = Check_Report(self.metadata) + report.region = service.region + report.status = "PASS" + report.status_extended = ( + f"VPC Endpoint Service {service.id} has no allowed principals." + ) + report.resource_id = service.id + findings.append(report) + else: + for principal in service.allowed_principals: + account_id = principal.split(":")[4] + report = Check_Report(self.metadata) + report.region = service.region + if ( + account_id in trusted_account_ids + or account_id in vpc_client.audited_account + ): + report.status = "PASS" + report.status_extended = f"Found trusted account {account_id} in VPC Endpoint Service {service.id}." + report.resource_id = service.id + else: + report.status = "FAIL" + report.status_extended = f"Found untrusted account {account_id} in VPC Endpoint Service {service.id}." + report.resource_id = service.id + findings.append(report) + + return findings diff --git a/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py b/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py new file mode 100644 index 00000000..d6971ebd --- /dev/null +++ b/providers/aws/services/vpc/vpc_endpoint_services_allowed_principals_trust_boundaries/vpc_endpoint_services_allowed_principals_trust_boundaries_test.py @@ -0,0 +1,128 @@ +from unittest import mock + +import botocore +from boto3 import client +from mock import patch +from moto import mock_ec2, mock_elbv2 + +AWS_REGION = "us-east-1" +ACCOUNT_ID = "123456789012" + +# Mocking VPC Calls +make_api_call = botocore.client.BaseClient._make_api_call +# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816 +# +# We have to mock every AWS API call using Boto3 +def mock_make_api_call(self, operation_name, kwarg): + if operation_name == "DescribeVpcEndpointServices": + return { + "ServiceDetails": [ + { + "ServiceId": "vpce-svc-4b919ac5", + "ServiceName": "string", + "Owner": ACCOUNT_ID, + "StageName": "test-stage", + } + ] + } + return make_api_call(self, operation_name, kwarg) + + +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +class Test_vpc_endpoint_services_allowed_principals_trust_boundaries: + @mock_ec2 + def test_vpc_no_endpoint_services(self): + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( + vpc_endpoint_services_allowed_principals_trust_boundaries, + ) + + check = vpc_endpoint_services_allowed_principals_trust_boundaries() + result = check.execute() + + assert len(result) == 23 # one endpoint per region + + @mock_ec2 + @mock_elbv2 + def test_vpc_endpoint_service_without_allowed_principals(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + elbv2_client = client("elbv2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + subnet = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION}a", + ) + lb_name = "lb_vpce-test" + lb_arn = elbv2_client.create_load_balancer( + Name=lb_name, + Subnets=[subnet["Subnet"]["SubnetId"]], + Scheme="internal", + Type="network", + )["LoadBalancers"][0]["LoadBalancerArn"] + + # Service is mocked until moto fix the issue https://github.com/spulec/moto/issues/5605 + # service = ec2_client.create_vpc_endpoint_service_configuration( + # NetworkLoadBalancerArns=[lb_arn] + # ) + + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( + vpc_endpoint_services_allowed_principals_trust_boundaries, + ) + + check = vpc_endpoint_services_allowed_principals_trust_boundaries() + result = check.execute() + + assert len(result) == 23 # one per region + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"VPC Endpoint Service {ec2_client.describe_vpc_endpoint_services()['ServiceDetails'][0]['ServiceId']} has no allowed principals." + ) + assert ( + result[0].resource_id + == ec2_client.describe_vpc_endpoint_services()["ServiceDetails"][0][ + "ServiceId" + ] + ) + + @mock_ec2 + def test_bad_response(self): + mock_client = mock.MagicMock() + + with mock.patch( + "providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_client", + new=mock_client, + ): + # Test Check + from providers.aws.services.vpc.vpc_endpoint_services_allowed_principals_trust_boundaries.vpc_endpoint_services_allowed_principals_trust_boundaries import ( + vpc_endpoint_services_allowed_principals_trust_boundaries, + ) + + check = vpc_endpoint_services_allowed_principals_trust_boundaries() + result = check.execute() + + assert len(result) == 0 diff --git a/providers/aws/services/vpc/vpc_flow_logs_enabled/__init__.py b/providers/aws/services/vpc/vpc_flow_logs_enabled/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled.metadata.json b/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled.metadata.json new file mode 100644 index 00000000..0bcf0e08 --- /dev/null +++ b/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "vpc_flow_logs_enabled", + "CheckTitle": "Ensure VPC Flow Logging is Enabled in all VPCs.", + "CheckType": ["Logging and Monitoring"], + "ServiceName": "vpc", + "SubServiceName": "flow_log", + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "Severity": "medium", + "ResourceType": "AwsEc2Vpc", + "Description": "Ensure VPC Flow Logging is Enabled in all VPCs.", + "Risk": "VPC Flow Logs provide visibility into network traffic that traverses the VPC and can be used to detect anomalous traffic or insight during security workflows.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "https://docs.bridgecrew.io/docs/logging_9-enable-vpc-flow-logging#aws-console", + "Terraform": "https://docs.bridgecrew.io/docs/logging_9-enable-vpc-flow-logging#terraform" + }, + "Recommendation": { + "Text": "It is recommended that VPC Flow Logs be enabled for packet Rejects for VPCs.", + "Url": "http://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled.py b/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled.py new file mode 100644 index 00000000..2929a05f --- /dev/null +++ b/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled.py @@ -0,0 +1,21 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.vpc.vpc_client import vpc_client + + +class vpc_flow_logs_enabled(Check): + def execute(self): + findings = [] + for vpc in vpc_client.vpcs: + report = Check_Report(self.metadata) + report.region = vpc.region + if vpc.flow_log: + report.status = "PASS" + report.status_extended = f"VPC {vpc.id} Flow logs are enabled." + report.resource_id = vpc.id + else: + report.status = "FAIL" + report.status_extended = f"VPC {vpc.id} Flow logs are disabled." + report.resource_id = vpc.id + findings.append(report) + + return findings diff --git a/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled_test.py b/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled_test.py new file mode 100644 index 00000000..bc5ce7c7 --- /dev/null +++ b/providers/aws/services/vpc/vpc_flow_logs_enabled/vpc_flow_logs_enabled_test.py @@ -0,0 +1,127 @@ +from unittest import mock + +from boto3 import client +from moto import mock_ec2 + +AWS_REGION = "us-east-1" +ACCOUNT_ID = "123456789012" + + +class Test_vpc_flow_logs_enabled: + @mock_ec2 + def test_vpc_only_default_vpcs(self): + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import ( + vpc_flow_logs_enabled, + ) + + check = vpc_flow_logs_enabled() + result = check.execute() + + assert ( + len(result) == 23 + ) # Number of AWS regions, one default VPC per region + + @mock_ec2 + def test_vpc_with_flow_logs(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + ec2_client.create_flow_logs( + ResourceType="VPC", + ResourceIds=[vpc["VpcId"]], + TrafficType="ALL", + LogDestinationType="cloud-watch-logs", + LogGroupName="test_logs", + DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role", + ) + + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import ( + vpc_flow_logs_enabled, + ) + + check = vpc_flow_logs_enabled() + result = check.execute() + + # Search created VPC among default ones + for result in result: + if result.resource_id == vpc["VpcId"]: + assert result.status == "PASS" + assert ( + result.status_extended + == f"VPC {vpc['VpcId']} Flow logs are enabled." + ) + assert result.resource_id == vpc["VpcId"] + + @mock_ec2 + def test_vpc_without_flow_logs(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import ( + vpc_flow_logs_enabled, + ) + + check = vpc_flow_logs_enabled() + result = check.execute() + + # Search created VPC among default ones + for result in result: + if result.resource_id == vpc["VpcId"]: + assert result.status == "FAIL" + assert ( + result.status_extended + == f"VPC {vpc['VpcId']} Flow logs are disabled." + ) + assert result.resource_id == vpc["VpcId"] + + @mock_ec2 + def test_bad_response(self): + mock_client = mock.MagicMock() + + with mock.patch( + "providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled.vpc_client", + new=mock_client, + ): + # Test Check + from providers.aws.services.vpc.vpc_flow_logs_enabled.vpc_flow_logs_enabled import ( + vpc_flow_logs_enabled, + ) + + check = vpc_flow_logs_enabled() + result = check.execute() + + assert len(result) == 0 diff --git a/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/__init__.py b/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege.metadata.json b/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege.metadata.json new file mode 100644 index 00000000..947e4cfa --- /dev/null +++ b/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege.metadata.json @@ -0,0 +1,35 @@ +{ + "Provider": "aws", + "CheckID": "vpc_peering_routing_tables_with_least_privilege", + "CheckTitle": "Ensure routing tables for VPC peering are least access.", + "CheckType": ["Infrastructure Security"], + "ServiceName": "vpc", + "SubServiceName": "route_table", + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "Severity": "medium", + "ResourceType": "AwsEc2Vpc", + "Description": "Ensure routing tables for VPC peering are least access.", + "Risk": "Being highly selective in peering routing tables is a very effective way of minimizing the impact of breach as resources outside of these routes are inaccessible to the peered VPC.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "https://docs.bridgecrew.io/docs/networking_5#cli-command", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Review routing tables of peered VPCs for whether they route all subnets of each VPC and whether that is necessary to accomplish the intended purposes for peering the VPCs.", + "Url": "https://docs.aws.amazon.com/vpc/latest/peering/peering-configurations-partial-access.html" + } + }, + "Categories": [], + "Tags": { + "Tag1Key": "value", + "Tag2Key": "value" + }, + "DependsOn": [], + "RelatedTo": [], + "Notes": "", + "Compliance": [] +} diff --git a/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege.py b/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege.py new file mode 100644 index 00000000..534bcc5f --- /dev/null +++ b/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege.py @@ -0,0 +1,31 @@ +from lib.check.models import Check, Check_Report +from providers.aws.services.vpc.vpc_client import vpc_client + + +class vpc_peering_routing_tables_with_least_privilege(Check): + def execute(self): + findings = [] + for peer in vpc_client.vpc_peering_connections: + report = Check_Report(self.metadata) + report.region = peer.region + comply = True + # Check each cidr in the peering route table + for route_table in peer.route_tables: + for cidr in route_table.destination_cidrs: + if ( + cidr == "0.0.0.0/0" + or cidr == peer.requester_cidr + or cidr == peer.accepter_cidr + ): # Check if cidr does not accept whole requester/accepter VPC CIDR + comply = False + if not comply: + report.status = "FAIL" + report.status_extended = f"VPC Peering Connection {peer.id} does not comply with least privilege access since it accepts whole VPCs CIDR in its route tables." + report.resource_id = peer.id + else: + report.status = "PASS" + report.status_extended = f"VPC Peering Connection {peer.id} comply with least privilege access." + report.resource_id = peer.id + findings.append(report) + + return findings diff --git a/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege_test.py b/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege_test.py new file mode 100644 index 00000000..fea302ba --- /dev/null +++ b/providers/aws/services/vpc/vpc_peering_routing_tables_with_least_privilege/vpc_peering_routing_tables_with_least_privilege_test.py @@ -0,0 +1,172 @@ +from unittest import mock + +from boto3 import client, resource +from moto import mock_ec2 + +AWS_REGION = "us-east-1" +ACCOUNT_ID = "123456789012" + + +class Test_vpc_peering_routing_tables_with_least_privilege: + @mock_ec2 + def test_vpc_no_peering_connections(self): + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import ( + vpc_peering_routing_tables_with_least_privilege, + ) + + check = vpc_peering_routing_tables_with_least_privilege() + result = check.execute() + + assert len(result) == 0 + + @mock_ec2 + def test_vpc_comply_peering_connection_(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + ec2_resource = resource("ec2", region_name=AWS_REGION) + + # Create VPCs peers as well as a comply route + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16") + peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16") + vpc_pcx = ec2_client.create_vpc_peering_connection( + VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"] + ) + vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"] + + vpc_pcx = ec2_client.accept_vpc_peering_connection( + VpcPeeringConnectionId=vpc_pcx_id + ) + main_route_table_id = ec2_client.describe_route_tables( + Filters=[ + {"Name": "vpc-id", "Values": [vpc["Vpc"]["VpcId"]]}, + {"Name": "association.main", "Values": ["true"]}, + ] + )["RouteTables"][0]["RouteTableId"] + main_route_table = ec2_resource.RouteTable(main_route_table_id) + main_route_table.create_route( + DestinationCidrBlock="10.0.0.4/24", VpcPeeringConnectionId=vpc_pcx_id + ) + + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC, Route + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client", + new=VPC(current_audit_info), + ) as service_client: + # Test Check + from providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import ( + vpc_peering_routing_tables_with_least_privilege, + ) + + service_client.vpc_peering_connections[0].route_tables = [ + Route( + main_route_table_id, + ["10.12.23.44/32"], + ) + ] + check = vpc_peering_routing_tables_with_least_privilege() + result = check.execute() + + assert len(result) == len( + ec2_client.describe_vpc_peering_connections()["VpcPeeringConnections"] + ) + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"VPC Peering Connection {vpc_pcx_id} comply with least privilege access." + ) + assert result[0].resource_id == vpc_pcx_id + assert result[0].region == AWS_REGION + + @mock_ec2 + def test_vpc_not_comply_peering_connection_(self): + # Create VPC Mocked Resources + ec2_client = client("ec2", region_name=AWS_REGION) + ec2_resource = resource("ec2", region_name=AWS_REGION) + + # Create VPCs peers as well as a comply route + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16") + peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16") + vpc_pcx = ec2_client.create_vpc_peering_connection( + VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"] + ) + vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"] + + vpc_pcx = ec2_client.accept_vpc_peering_connection( + VpcPeeringConnectionId=vpc_pcx_id + ) + main_route_table_id = ec2_client.describe_route_tables( + Filters=[ + {"Name": "vpc-id", "Values": [vpc["Vpc"]["VpcId"]]}, + {"Name": "association.main", "Values": ["true"]}, + ] + )["RouteTables"][0]["RouteTableId"] + main_route_table = ec2_resource.RouteTable(main_route_table_id) + main_route_table.create_route( + DestinationCidrBlock="10.0.0.0/16", VpcPeeringConnectionId=vpc_pcx_id + ) + + from providers.aws.lib.audit_info.audit_info import current_audit_info + from providers.aws.services.vpc.vpc_service import VPC, Route + + current_audit_info.audited_partition = "aws" + + with mock.patch( + "providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client", + new=VPC(current_audit_info), + ) as service_client: + # Test Check + from providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import ( + vpc_peering_routing_tables_with_least_privilege, + ) + + service_client.vpc_peering_connections[0].route_tables = [ + Route( + main_route_table_id, + ["10.0.0.0/16"], + ) + ] + check = vpc_peering_routing_tables_with_least_privilege() + result = check.execute() + + assert len(result) == len( + ec2_client.describe_vpc_peering_connections()["VpcPeeringConnections"] + ) + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"VPC Peering Connection {vpc_pcx_id} does not comply with least privilege access since it accepts whole VPCs CIDR in its route tables." + ) + assert result[0].resource_id == vpc_pcx_id + assert result[0].region == AWS_REGION + + @mock_ec2 + def test_bad_response(self): + mock_client = mock.MagicMock() + + with mock.patch( + "providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege.vpc_client", + new=mock_client, + ): + # Test Check + from providers.aws.services.vpc.vpc_peering_routing_tables_with_least_privilege.vpc_peering_routing_tables_with_least_privilege import ( + vpc_peering_routing_tables_with_least_privilege, + ) + + check = vpc_peering_routing_tables_with_least_privilege() + result = check.execute() + + assert len(result) == 0 diff --git a/providers/aws/services/vpc/vpc_service.py b/providers/aws/services/vpc/vpc_service.py new file mode 100644 index 00000000..f3387da0 --- /dev/null +++ b/providers/aws/services/vpc/vpc_service.py @@ -0,0 +1,306 @@ +import json +import threading +from dataclasses import dataclass + +from lib.logger import logger +from providers.aws.aws_provider import generate_regional_clients + + +################## VPC +class VPC: + def __init__(self, audit_info): + self.service = "ec2" + self.session = audit_info.audit_session + self.audited_account = audit_info.audited_account + self.regional_clients = generate_regional_clients(self.service, audit_info) + self.vpcs = [] + self.vpc_peering_connections = [] + self.vpc_endpoints = [] + self.vpc_endpoint_services = [] + self.__threading_call__(self.__describe_vpcs__) + self.__threading_call__(self.__describe_vpc_peering_connections__) + self.__threading_call__(self.__describe_vpc_endpoints__) + self.__threading_call__(self.__describe_vpc_endpoint_services__) + self.__describe_flow_logs__() + self.__describe_route_tables__() + self.__describe_vpc_endpoint_service_permissions__() + + def __get_session__(self): + return self.session + + def __threading_call__(self, call): + threads = [] + for regional_client in self.regional_clients.values(): + threads.append(threading.Thread(target=call, args=(regional_client,))) + for t in threads: + t.start() + for t in threads: + t.join() + + def __describe_vpcs__(self, regional_client): + logger.info("VPC - Describing VPCs...") + try: + describe_vpcs_paginator = regional_client.get_paginator("describe_vpcs") + for page in describe_vpcs_paginator.paginate(): + for vpc in page["Vpcs"]: + self.vpcs.append( + VPCs( + vpc["VpcId"], + vpc["IsDefault"], + vpc["CidrBlock"], + regional_client.region, + ) + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __describe_vpc_peering_connections__(self, regional_client): + logger.info("VPC - Describing VPC Peering Connections...") + try: + describe_vpc_peering_connections_paginator = regional_client.get_paginator( + "describe_vpc_peering_connections" + ) + for page in describe_vpc_peering_connections_paginator.paginate(): + for conn in page["VpcPeeringConnections"]: + self.vpc_peering_connections.append( + VpcPeeringConnection( + conn["VpcPeeringConnectionId"], + conn["AccepterVpcInfo"]["VpcId"], + conn["AccepterVpcInfo"]["CidrBlock"], + conn["RequesterVpcInfo"]["VpcId"], + conn["RequesterVpcInfo"]["CidrBlock"], + regional_client.region, + ) + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __describe_route_tables__(self): + logger.info("VPC - Describing Peering Route Tables...") + try: + for conn in self.vpc_peering_connections: + regional_client = self.regional_clients[conn.region] + for route_table in regional_client.describe_route_tables( + Filters=[ + { + "Name": "route.vpc-peering-connection-id", + "Values": [ + conn.id, + ], + }, + ] + )["RouteTables"]: + destination_cidrs = [] + for route in route_table["Routes"]: + if ( + route["Origin"] != "CreateRouteTable" + ): # avoid default route table + destination_cidrs.append(route["DestinationCidrBlock"]) + conn.route_tables.append( + Route( + route_table["RouteTableId"], + destination_cidrs, + ) + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __describe_flow_logs__(self): + logger.info("VPC - Describing flow logs...") + try: + for vpc in self.vpcs: + regional_client = self.regional_clients[vpc.region] + flow_logs = regional_client.describe_flow_logs( + Filters=[ + { + "Name": "resource-id", + "Values": [ + vpc.id, + ], + }, + ] + )["FlowLogs"] + if flow_logs: + vpc.flow_log = True + except Exception as error: + logger.error(f"{error.__class__.__name__}: {error}") + + def __describe_vpc_endpoints__(self, regional_client): + logger.info("VPC - Describing VPC Endpoints...") + try: + describe_vpc_endpoints_paginator = regional_client.get_paginator( + "describe_vpc_endpoints" + ) + for page in describe_vpc_endpoints_paginator.paginate(): + for endpoint in page["VpcEndpoints"]: + self.vpc_endpoints.append( + VpcEndpoint( + endpoint["VpcEndpointId"], + endpoint["VpcId"], + endpoint["State"], + json.loads(endpoint["PolicyDocument"]), + endpoint["OwnerId"], + regional_client.region, + ) + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __describe_vpc_endpoint_services__(self, regional_client): + logger.info("VPC - Describing VPC Endpoint Services...") + try: + describe_vpc_endpoint_services_paginator = regional_client.get_paginator( + "describe_vpc_endpoint_services" + ) + for page in describe_vpc_endpoint_services_paginator.paginate(): + for endpoint in page["ServiceDetails"]: + if endpoint["Owner"] != "amazon": + self.vpc_endpoint_services.append( + VpcEndpointService( + endpoint["ServiceId"], + endpoint["ServiceName"], + endpoint["Owner"], + regional_client.region, + ) + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __describe_vpc_endpoint_service_permissions__(self): + logger.info("VPC - Describing VPC Endpoint service permissions...") + try: + for service in self.vpc_endpoint_services: + regional_client = self.regional_clients[service.region] + for ( + principal + ) in regional_client.describe_vpc_endpoint_service_permissions( + ServiceId=service.id + )[ + "AllowedPrincipals" + ]: + service.allowed_principals.append(principal["Principal"]) + except Exception as error: + logger.error(f"{error.__class__.__name__}: {error}") + + +@dataclass +class VPCs: + id: str + default: bool + cidr_block: str + flow_log: bool + region: str + + def __init__( + self, + id, + default, + cidr_block, + region, + ): + self.id = id + self.default = default + self.cidr_block = cidr_block + self.flow_log = False + self.region = region + + +@dataclass +class Route: + id: str + destination_cidrs: list[str] + + def __init__( + self, + id, + destination_cidrs, + ): + self.id = id + self.destination_cidrs = destination_cidrs + + +@dataclass +class VpcPeeringConnection: + id: str + accepter_vpc: str + accepter_cidr: str + requester_vpc: str + requester_cidr: str + route_tables: list[Route] + region: str + + def __init__( + self, + id, + accepter_vpc, + accepter_cidr, + requester_vpc, + requester_cidr, + region, + ): + self.id = id + self.accepter_vpc = accepter_vpc + self.accepter_cidr = accepter_cidr + self.requester_vpc = requester_vpc + self.requester_cidr = requester_cidr + self.route_tables = [] + self.region = region + + +@dataclass +class VpcEndpoint: + id: str + vpc_id: str + state: str + policy_document: dict + owner_id: list[Route] + region: str + + def __init__( + self, + id, + vpc_id, + state, + policy_document, + owner_id, + region, + ): + self.id = id + self.vpc_id = vpc_id + self.state = state + self.policy_document = policy_document + self.owner_id = owner_id + self.route_tables = [] + self.region = region + + +@dataclass +class VpcEndpointService: + id: str + service: str + owner_id: str + allowed_principals: list + region: str + + def __init__( + self, + id, + service, + owner_id, + region, + ): + self.id = id + self.service = service + self.owner_id = owner_id + self.allowed_principals = [] + self.region = region diff --git a/providers/aws/services/vpc/vpc_service_test.py b/providers/aws/services/vpc/vpc_service_test.py new file mode 100644 index 00000000..287578ec --- /dev/null +++ b/providers/aws/services/vpc/vpc_service_test.py @@ -0,0 +1,258 @@ +import json + +from boto3 import client, resource, session +from moto import mock_ec2, mock_elbv2 + +from providers.aws.lib.audit_info.models import AWS_Audit_Info +from providers.aws.services.vpc.vpc_service import VPC, Route + +AWS_ACCOUNT_NUMBER = 123456789012 +AWS_REGION = "us-east-1" + + +class Test_VPC_Service: + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + ) + return audit_info + + # Test VPC Service + @mock_ec2 + def test_service(self): + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + assert vpc.service == "ec2" + + # Test VPC Client + @mock_ec2 + def test_client(self): + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + for client in vpc.regional_clients.values(): + assert client.__class__.__name__ == "EC2" + + # Test VPC Session + @mock_ec2 + def test__get_session__(self): + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + assert vpc.session.__class__.__name__ == "Session" + + # Test VPC Session + @mock_ec2 + def test_audited_account(self): + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + assert vpc.audited_account == AWS_ACCOUNT_NUMBER + + # Test VPC Describe VPCs + @mock_ec2 + def test__describe_vpcs__(self): + # Generate VPC Client + ec2_client = client("ec2", region_name=AWS_REGION) + # Create VPC + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + assert ( + len(vpc.vpcs) == 24 + ) # Number of AWS regions + created VPC, one default VPC per region + + # Test VPC Describe Flow Logs + @mock_ec2 + def test__describe_flow_logs__(self): + # Generate VPC Client + ec2_client = client("ec2", region_name=AWS_REGION) + new_vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + # Create VPC Flow log + ec2_client.create_flow_logs( + ResourceType="VPC", + ResourceIds=[new_vpc["VpcId"]], + TrafficType="ALL", + LogDestinationType="cloud-watch-logs", + LogGroupName="test_logs", + DeliverLogsPermissionArn="arn:aws:iam::" + + str(AWS_ACCOUNT_NUMBER) + + ":role/test-role", + ) + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + # Search created VPC among default ones + for vpc in vpc.vpcs: + if vpc.id == new_vpc["VpcId"]: + assert vpc.flow_log == True + + # Test VPC Describe VPC Peering connections + @mock_ec2 + def test__describe_vpc_peering_connections__(self): + # Generate VPC Client + ec2_client = client("ec2", region_name=AWS_REGION) + # Create VPCs peers + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16") + peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16") + vpc_pcx = ec2_client.create_vpc_peering_connection( + VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"] + ) + vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"] + + vpc_pcx = ec2_client.accept_vpc_peering_connection( + VpcPeeringConnectionId=vpc_pcx_id + ) + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + assert len(vpc.vpc_peering_connections) == 1 + assert vpc.vpc_peering_connections[0].id == vpc_pcx_id + + # Test VPC Describe VPC Peering connections + @mock_ec2 + def test__describe_vpc_peering_connections__(self): + # Generate VPC Client + ec2_client = client("ec2", region_name=AWS_REGION) + # Create VPCs peers + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16") + peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16") + vpc_pcx = ec2_client.create_vpc_peering_connection( + VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"] + ) + vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"] + + vpc_pcx = ec2_client.accept_vpc_peering_connection( + VpcPeeringConnectionId=vpc_pcx_id + ) + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + assert len(vpc.vpc_peering_connections) == 1 + assert vpc.vpc_peering_connections[0].id == vpc_pcx_id + + # Test VPC Describe VPC Peering connections + @mock_ec2 + def test__describe_route_tables__(self): + # Generate VPC Client + ec2_client = client("ec2", region_name=AWS_REGION) + ec2_resource = resource("ec2", region_name=AWS_REGION) + + # Create VPCs peers as well as a route + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16") + peer_vpc = ec2_client.create_vpc(CidrBlock="11.0.0.0/16") + vpc_pcx = ec2_client.create_vpc_peering_connection( + VpcId=vpc["Vpc"]["VpcId"], PeerVpcId=peer_vpc["Vpc"]["VpcId"] + ) + vpc_pcx_id = vpc_pcx["VpcPeeringConnection"]["VpcPeeringConnectionId"] + + vpc_pcx = ec2_client.accept_vpc_peering_connection( + VpcPeeringConnectionId=vpc_pcx_id + ) + main_route_table_id = ec2_client.describe_route_tables( + Filters=[ + {"Name": "vpc-id", "Values": [vpc["Vpc"]["VpcId"]]}, + {"Name": "association.main", "Values": ["true"]}, + ] + )["RouteTables"][0]["RouteTableId"] + # FilterNotImplementedError: The filter 'route.vpc-peering-connection-id' for DescribeRouteTables has not been implemented in Moto yet. + # main_route_table = ec2_resource.RouteTable(main_route_table_id) + # main_route_table.create_route( + # DestinationCidrBlock="10.0.0.4/24", VpcPeeringConnectionId=vpc_pcx_id + # ) + + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + vpc.vpc_peering_connections[0].route_tables = [ + Route( + main_route_table_id, + ["10.0.0.4/24"], + ) + ] + assert len(vpc.vpc_peering_connections[0].route_tables) == 1 + assert vpc.vpc_peering_connections[0].id == vpc_pcx_id + + # Test VPC Describe VPC Endpoints + @mock_ec2 + def test__describe_vpc_endpoints__(self): + # Generate VPC Client + ec2_client = client("ec2", region_name=AWS_REGION) + # Create VPC endpoint + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"] + + route_table = ec2_client.create_route_table(VpcId=vpc["VpcId"])["RouteTable"] + endpoint = ec2_client.create_vpc_endpoint( + VpcId=vpc["VpcId"], + ServiceName="com.amazonaws.us-east-1.s3", + RouteTableIds=[route_table["RouteTableId"]], + VpcEndpointType="Gateway", + PolicyDocument=json.dumps( + { + "Statement": [ + { + "Action": "*", + "Effect": "Allow", + "Principal": "*", + "Resource": "*", + } + ] + } + ), + )["VpcEndpoint"]["VpcEndpointId"] + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + assert len(vpc.vpc_endpoints) == 1 + assert vpc.vpc_endpoints[0].id == endpoint + + # Test VPC Describe VPC Endpoint Services + @mock_ec2 + @mock_elbv2 + def test__describe_vpc_endpoint_services__(self): + # Generate VPC Client + ec2_client = client("ec2", region_name=AWS_REGION) + elbv2_client = client("elbv2", region_name=AWS_REGION) + + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + subnet = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION}a", + ) + lb_name = "lb_vpce-test" + lb_arn = elbv2_client.create_load_balancer( + Name=lb_name, + Subnets=[subnet["Subnet"]["SubnetId"]], + Scheme="internal", + Type="network", + )["LoadBalancers"][0]["LoadBalancerArn"] + + service = ec2_client.create_vpc_endpoint_service_configuration( + NetworkLoadBalancerArns=[lb_arn] + ) + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + assert ( + len(vpc.vpc_endpoint_services) == 0 + ) # Wait until this issue is fixed https://github.com/spulec/moto/issues/5605