From b3656761eb34c70f6e3fc687dba3a086311829d1 Mon Sep 17 00:00:00 2001 From: Gabriel Soltz Date: Wed, 19 Apr 2023 12:01:12 +0200 Subject: [PATCH] feat(check): New VPC checks (#2218) --- .../vpc/vpc_different_regions/__init__.py | 0 .../vpc_different_regions.metadata.json | 32 +++ .../vpc_different_regions.py | 26 ++ .../providers/aws/services/vpc/vpc_service.py | 100 +++++++- .../vpc/vpc_subnet_different_az/__init__.py | 0 .../vpc_subnet_different_az.metadata.json | 32 +++ .../vpc_subnet_different_az.py | 32 +++ .../__init__.py | 0 ...bnet_separate_private_public.metadata.json | 32 +++ .../vpc_subnet_separate_private_public.py | 36 +++ .../vpc_different_regions_test.py | 117 +++++++++ .../aws/services/vpc/vpc_service_test.py | 31 +++ .../vpc_subnet_different_az_test.py | 183 ++++++++++++++ ...vpc_subnet_separate_private_public_test.py | 229 ++++++++++++++++++ 14 files changed, 845 insertions(+), 5 deletions(-) create mode 100644 prowler/providers/aws/services/vpc/vpc_different_regions/__init__.py create mode 100644 prowler/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions.metadata.json create mode 100644 prowler/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions.py create mode 100644 prowler/providers/aws/services/vpc/vpc_subnet_different_az/__init__.py create mode 100644 prowler/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az.metadata.json create mode 100644 prowler/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az.py create mode 100644 prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/__init__.py create mode 100644 prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public.metadata.json create mode 100644 prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public.py create mode 100644 tests/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions_test.py create mode 100644 tests/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az_test.py create mode 100644 tests/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public_test.py diff --git a/prowler/providers/aws/services/vpc/vpc_different_regions/__init__.py b/prowler/providers/aws/services/vpc/vpc_different_regions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions.metadata.json b/prowler/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions.metadata.json new file mode 100644 index 00000000..5b505e18 --- /dev/null +++ b/prowler/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "vpc_different_regions", + "CheckTitle": "Ensure there are vpcs in more than one region", + "CheckType": [ + "Infrastructure Security" + ], + "ServiceName": "vpc", + "SubServiceName": "subnet", + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "Severity": "medium", + "ResourceType": "AwsEc2Vpc", + "Description": "Ensure there are vpcs in more than one region", + "Risk": "", + "RelatedUrl": "https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Scenario2.html", + "Remediation": { + "Code": { + "CLI": "aws ec2 create-vpc", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Ensure there are vpcs in more than one region", + "Url": "" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions.py b/prowler/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions.py new file mode 100644 index 00000000..90103590 --- /dev/null +++ b/prowler/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions.py @@ -0,0 +1,26 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.vpc.vpc_client import vpc_client + + +class vpc_different_regions(Check): + def execute(self): + findings = [] + region = None + for vpc in vpc_client.vpcs: + if not vpc.default: + report = Check_Report_AWS(self.metadata()) + # This is a global check under the vpc service: region, resource_id and tags are not relevant here but we keep them for consistency + report.region = vpc.region + report.resource_id = vpc.id + report.resource_tags = vpc.tags + report.status = "FAIL" + report.status_extended = f"VPCs found only in one region {vpc.region}." + if region and vpc.region != region: + report.status = "PASS" + report.status_extended = "VPCs found in more than one region." + break + region = vpc.region + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/vpc/vpc_service.py b/prowler/providers/aws/services/vpc/vpc_service.py index 9a8b37b8..236ac9ce 100644 --- a/prowler/providers/aws/services/vpc/vpc_service.py +++ b/prowler/providers/aws/services/vpc/vpc_service.py @@ -26,8 +26,10 @@ class VPC: self.__threading_call__(self.__describe_vpc_endpoints__) self.__threading_call__(self.__describe_vpc_endpoint_services__) self.__describe_flow_logs__() - self.__describe_route_tables__() + self.__describe_peering_route_tables__() self.__describe_vpc_endpoint_service_permissions__() + self.vpc_subnets = [] + self.__threading_call__(self.__describe_vpc_subnets__) def __get_session__(self): return self.session @@ -96,7 +98,7 @@ class VPC: f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __describe_route_tables__(self): + def __describe_peering_route_tables__(self): logger.info("VPC - Describing Peering Route Tables...") try: for conn in self.vpc_peering_connections: @@ -124,9 +126,10 @@ class VPC: destination_cidrs=destination_cidrs, ) ) + except Exception as error: logger.error( - f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" ) def __describe_flow_logs__(self): @@ -146,8 +149,11 @@ class VPC: )["FlowLogs"] if flow_logs: vpc.flow_log = True + except Exception as error: - logger.error(f"{error.__class__.__name__}: {error}") + logger.error( + f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" + ) def __describe_vpc_endpoints__(self, regional_client): logger.info("VPC - Describing VPC Endpoints...") @@ -223,7 +229,90 @@ class VPC: ]: service.allowed_principals.append(principal["Principal"]) except Exception as error: - logger.error(f"{error.__class__.__name__}: {error}") + logger.error( + f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" + ) + + def __describe_vpc_subnets__(self, regional_client): + logger.info("VPC - Describing VPC subnets...") + try: + describe_subnets_paginator = regional_client.get_paginator( + "describe_subnets" + ) + for page in describe_subnets_paginator.paginate(): + for subnet in page["Subnets"]: + if not self.audit_resources or ( + is_resource_filtered(subnet["SubnetId"], self.audit_resources) + ): + try: + # Check the route table associated with the subnet to see if it's public + regional_client_for_subnet = self.regional_clients[ + regional_client.region + ] + route_tables_for_subnet = ( + regional_client_for_subnet.describe_route_tables( + Filters=[ + { + "Name": "association.subnet-id", + "Values": [subnet["SubnetId"]], + } + ] + ) + ) + if not route_tables_for_subnet.get("RouteTables"): + # If a subnet is not explicitly associated with any route table, it is implicitly associated with the main route table. + route_tables_for_subnet = ( + regional_client_for_subnet.describe_route_tables( + Filters=[ + { + "Name": "association.main", + "Values": ["true"], + } + ] + ) + ) + public = False + for route in route_tables_for_subnet.get("RouteTables")[ + 0 + ].get("Routes"): + if "GatewayId" in route and "igw" in route["GatewayId"]: + public = True + break + # Add it to to list of vpc_subnets and to the VPC object + object = VpcSubnet( + id=subnet["SubnetId"], + default=subnet["DefaultForAz"], + vpc_id=subnet["VpcId"], + cidr_block=subnet["CidrBlock"], + region=regional_client.region, + availability_zone=subnet["AvailabilityZone"], + public=public, + tags=subnet.get("Tags"), + ) + self.vpc_subnets.append(object) + # Add it to the VPC object + for vpc in self.vpcs: + if vpc.id == subnet["VpcId"]: + vpc.subnets.append(object) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +class VpcSubnet(BaseModel): + id: str + default: bool + vpc_id: str + cidr_block: str + availability_zone: str + public: bool + region: str + tags: Optional[list] = [] class VPCs(BaseModel): @@ -232,6 +321,7 @@ class VPCs(BaseModel): cidr_block: str flow_log: bool = False region: str + subnets: list[VpcSubnet] = [] tags: Optional[list] = [] diff --git a/prowler/providers/aws/services/vpc/vpc_subnet_different_az/__init__.py b/prowler/providers/aws/services/vpc/vpc_subnet_different_az/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az.metadata.json b/prowler/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az.metadata.json new file mode 100644 index 00000000..3dfa5a22 --- /dev/null +++ b/prowler/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "vpc_subnet_different_az", + "CheckTitle": "Ensure all vpc has subnets in more than one availability zone", + "CheckType": [ + "Infrastructure Security" + ], + "ServiceName": "vpc", + "SubServiceName": "subnet", + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "Severity": "medium", + "ResourceType": "AwsEc2Vpc", + "Description": "Ensure all vpc has subnets in more than one availability zone", + "Risk": "", + "RelatedUrl": "https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Scenario2.html", + "Remediation": { + "Code": { + "CLI": "aws ec2 create-subnet", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Ensure all vpc has subnets in more than one availability zone", + "Url": "" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az.py b/prowler/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az.py new file mode 100644 index 00000000..5116749e --- /dev/null +++ b/prowler/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az.py @@ -0,0 +1,32 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.vpc.vpc_client import vpc_client + + +class vpc_subnet_different_az(Check): + def execute(self): + findings = [] + for vpc in vpc_client.vpcs: + report = Check_Report_AWS(self.metadata()) + report.region = vpc.region + report.resource_tags = vpc.tags + report.status = "FAIL" + report.status_extended = f"VPC {vpc.id} has no subnets." + report.resource_id = vpc.id + if vpc.subnets: + availability_zone = None + for subnet in vpc.subnets: + if ( + availability_zone + and subnet.availability_zone != availability_zone + ): + report.status = "PASS" + report.status_extended = f"VPC {vpc.id} has subnets in more than one availability zone." + break + availability_zone = subnet.availability_zone + report.status_extended = ( + f"VPC {vpc.id} has only subnets in {availability_zone}." + ) + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/__init__.py b/prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public.metadata.json b/prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public.metadata.json new file mode 100644 index 00000000..75660a26 --- /dev/null +++ b/prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "vpc_subnet_separate_private_public", + "CheckTitle": "Ensure all vpc has public and private subnets defined", + "CheckType": [ + "Infrastructure Security" + ], + "ServiceName": "vpc", + "SubServiceName": "subnet", + "ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id", + "Severity": "medium", + "ResourceType": "AwsEc2Vpc", + "Description": "Ensure all vpc has public and private subnets defined", + "Risk": "", + "RelatedUrl": "https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Scenario2.html", + "Remediation": { + "Code": { + "CLI": "aws ec2 create-subnet", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "Ensure all vpc has public and private subnets defined", + "Url": "" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public.py b/prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public.py new file mode 100644 index 00000000..2f3acd0a --- /dev/null +++ b/prowler/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public.py @@ -0,0 +1,36 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.vpc.vpc_client import vpc_client + + +class vpc_subnet_separate_private_public(Check): + def execute(self): + findings = [] + for vpc in vpc_client.vpcs: + report = Check_Report_AWS(self.metadata()) + report.region = vpc.region + report.resource_tags = vpc.tags + report.status = "FAIL" + report.status_extended = f"VPC {vpc.id} has no subnets." + report.resource_id = vpc.id + if vpc.subnets: + public = False + private = False + for subnet in vpc.subnets: + if subnet.public: + public = True + report.status_extended = ( + f"VPC {vpc.id} has only public subnets." + ) + if not subnet.public: + private = True + report.status_extended = ( + f"VPC {vpc.id} has only private subnets." + ) + if public and private: + report.status = "PASS" + report.status_extended = ( + f"VPC {vpc.id} has private and public subnets." + ) + findings.append(report) + + return findings diff --git a/tests/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions_test.py b/tests/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions_test.py new file mode 100644 index 00000000..2b1f1b3e --- /dev/null +++ b/tests/providers/aws/services/vpc/vpc_different_regions/vpc_different_regions_test.py @@ -0,0 +1,117 @@ +from unittest import mock + +from boto3 import client, session +from moto import mock_ec2 + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_REGION = "us-east-1" +AWS_ACCOUNT_NUMBER = "123456789012" + + +class Test_vpc_different_regions: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=["us-east-1", "eu-west-1"], + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, + ) + + return audit_info + + @mock_ec2 + def test_vpc_different_regions(self): + # VPC Region 1 + ec2_client = client("ec2", region_name=AWS_REGION) + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + # VPC Region 2 + ec2_client_eu = client("ec2", region_name="eu-west-1") + vpc_eu = ec2_client_eu.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_different_regions.vpc_different_regions.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_different_regions.vpc_different_regions import ( + vpc_different_regions, + ) + + check = vpc_different_regions() + result = check.execute() + + assert len(result) == 1 + assert result[0].region in ["us-east-1", "eu-west-1"] + assert ( + result[0].status_extended == "VPCs found in more than one region." + ) + assert result[0].resource_id in [ + vpc["Vpc"]["VpcId"], + vpc_eu["Vpc"]["VpcId"], + ] + assert result[0].resource_tags == [] + assert result[0].status == "PASS" + + @mock_ec2 + def test_vpc_only_one_regions(self): + ec2_client = client("ec2", region_name=AWS_REGION) + # VPC Region 1 + vpc = ec2_client.create_vpc( + CidrBlock="172.28.6.0/24", InstanceTenancy="default" + ) + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_different_regions.vpc_different_regions.vpc_client", + new=VPC(current_audit_info), + ): + # Test Check + from prowler.providers.aws.services.vpc.vpc_different_regions.vpc_different_regions import ( + vpc_different_regions, + ) + + check = vpc_different_regions() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].region == AWS_REGION + assert ( + result[0].status_extended + == f"VPCs found only in one region {AWS_REGION}." + ) + assert result[0].resource_id == vpc["Vpc"]["VpcId"] + assert result[0].resource_tags == [] diff --git a/tests/providers/aws/services/vpc/vpc_service_test.py b/tests/providers/aws/services/vpc/vpc_service_test.py index 82405145..5e2fa333 100644 --- a/tests/providers/aws/services/vpc/vpc_service_test.py +++ b/tests/providers/aws/services/vpc/vpc_service_test.py @@ -282,3 +282,34 @@ class Test_VPC_Service: assert ( len(vpc.vpc_endpoint_services) == 0 ) # Wait until this issue is fixed https://github.com/spulec/moto/issues/5605 + + # Test VPC Describe VPC Subnets + @mock_ec2 + def test__describe_vpc_subnets__(self): + # Generate VPC Client + ec2_client = client("ec2", region_name=AWS_REGION) + # Create VPC + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + subnet = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION}a", + ) + # VPC client for this test class + audit_info = self.set_mocked_audit_info() + vpc = VPC(audit_info) + assert ( + len(vpc.vpcs) == 3 + ) # Number of AWS regions + created VPC, one default VPC per region + for vpc in vpc.vpcs: + if vpc.cidr_block == "172.28.7.0/24": + assert vpc.subnets[0].id == subnet["Subnet"]["SubnetId"] + assert vpc.subnets[0].default is False + assert vpc.subnets[0].vpc_id == vpc.id + assert vpc.subnets[0].cidr_block == "172.28.7.192/26" + assert vpc.subnets[0].availability_zone == f"{AWS_REGION}a" + assert vpc.subnets[0].public is False + assert vpc.subnets[0].region == AWS_REGION + assert vpc.subnets[0].tags is None diff --git a/tests/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az_test.py b/tests/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az_test.py new file mode 100644 index 00000000..1b0c0d4a --- /dev/null +++ b/tests/providers/aws/services/vpc/vpc_subnet_different_az/vpc_subnet_different_az_test.py @@ -0,0 +1,183 @@ +from unittest import mock + +from boto3 import client, session +from moto import mock_ec2 + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_REGION = "us-east-1" +AWS_ACCOUNT_NUMBER = "123456789012" + + +class Test_vpc_subnet_different_az: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, + ) + + return audit_info + + @mock_ec2 + def test_vpc_subnet_different_az(self): + ec2_client = client("ec2", region_name=AWS_REGION) + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + # VPC AZ 1 + ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION}a", + ) + + # VPC AZ 2 + ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.0/26", + AvailabilityZone=f"{AWS_REGION}b", + ) + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_subnet_different_az.vpc_subnet_different_az.vpc_client", + new=VPC(current_audit_info), + ): + from prowler.providers.aws.services.vpc.vpc_subnet_different_az.vpc_subnet_different_az import ( + vpc_subnet_different_az, + ) + + check = vpc_subnet_different_az() + results = check.execute() + + found = False + for result in results: + if result.resource_id == vpc["Vpc"]["VpcId"]: + found = True + assert result.status == "PASS" + assert ( + result.status_extended + == f"VPC {vpc['Vpc']['VpcId']} has subnets in more than one availability zone." + ) + assert result.resource_id == vpc["Vpc"]["VpcId"] + assert result.resource_tags == [] + assert result.region == AWS_REGION + if not found: + assert False + + @mock_ec2 + def test_vpc_subnet_same_az(self): + ec2_client = client("ec2", region_name=AWS_REGION) + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + # VPC AZ 1 + ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION}a", + ) + + # VPC AZ 2 + ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.0/26", + AvailabilityZone=f"{AWS_REGION}a", + ) + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_subnet_different_az.vpc_subnet_different_az.vpc_client", + new=VPC(current_audit_info), + ): + from prowler.providers.aws.services.vpc.vpc_subnet_different_az.vpc_subnet_different_az import ( + vpc_subnet_different_az, + ) + + check = vpc_subnet_different_az() + results = check.execute() + + found = False + for result in results: + if result.resource_id == vpc["Vpc"]["VpcId"]: + found = True + assert result.status == "FAIL" + assert ( + result.status_extended + == f"VPC {vpc['Vpc']['VpcId']} has only subnets in {AWS_REGION}a." + ) + assert result.resource_id == vpc["Vpc"]["VpcId"] + assert result.resource_tags == [] + assert result.region == AWS_REGION + if not found: + assert False + + @mock_ec2 + def test_vpc_no_subnets(self): + ec2_client = client("ec2", region_name=AWS_REGION) + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_subnet_different_az.vpc_subnet_different_az.vpc_client", + new=VPC(current_audit_info), + ): + from prowler.providers.aws.services.vpc.vpc_subnet_different_az.vpc_subnet_different_az import ( + vpc_subnet_different_az, + ) + + check = vpc_subnet_different_az() + results = check.execute() + + found = False + for result in results: + if result.resource_id == vpc["Vpc"]["VpcId"]: + found = True + assert result.status == "FAIL" + assert ( + result.status_extended + == f"VPC {vpc['Vpc']['VpcId']} has no subnets." + ) + assert result.resource_id == vpc["Vpc"]["VpcId"] + assert result.resource_tags == [] + assert result.region == AWS_REGION + if not found: + assert False diff --git a/tests/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public_test.py b/tests/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public_test.py new file mode 100644 index 00000000..f3ae1e54 --- /dev/null +++ b/tests/providers/aws/services/vpc/vpc_subnet_separate_private_public/vpc_subnet_separate_private_public_test.py @@ -0,0 +1,229 @@ +from unittest import mock + +from boto3 import client, session +from moto import mock_ec2 + +from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info + +AWS_REGION = "us-east-1" +AWS_ACCOUNT_NUMBER = "123456789012" + + +class Test_vpc_subnet_separate_private_public: + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=AWS_ACCOUNT_NUMBER, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=["us-east-1", "eu-west-1"], + organizations_metadata=None, + audit_resources=None, + ) + + return audit_info + + @mock_ec2 + def test_vpc_subnet_only_private(self): + ec2_client = client("ec2", region_name=AWS_REGION) + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + # VPC Private + subnet_private = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION}a", + ) + route_table_private = ec2_client.create_route_table( + VpcId=vpc["Vpc"]["VpcId"], + ) + ec2_client.create_route( + DestinationCidrBlock="10.10.10.0", + RouteTableId=route_table_private["RouteTable"]["RouteTableId"], + ) + ec2_client.associate_route_table( + RouteTableId=route_table_private["RouteTable"]["RouteTableId"], + SubnetId=subnet_private["Subnet"]["SubnetId"], + ) + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_subnet_separate_private_public.vpc_subnet_separate_private_public.vpc_client", + new=VPC(current_audit_info), + ): + from prowler.providers.aws.services.vpc.vpc_subnet_separate_private_public.vpc_subnet_separate_private_public import ( + vpc_subnet_separate_private_public, + ) + + check = vpc_subnet_separate_private_public() + results = check.execute() + + found = False + for result in results: + if result.resource_id == vpc["Vpc"]["VpcId"]: + found = True + assert result.status == "FAIL" + assert ( + result.status_extended + == f"VPC {vpc['Vpc']['VpcId']} has only private subnets." + ) + assert result.resource_id == vpc["Vpc"]["VpcId"] + assert result.resource_tags == [] + assert result.region == AWS_REGION + if not found: + assert False + + @mock_ec2 + def test_vpc_subnet_only_public(self): + ec2_client = client("ec2", region_name=AWS_REGION) + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + # VPC Public + subnet_public = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION}a", + ) + route_table_public = ec2_client.create_route_table( + VpcId=vpc["Vpc"]["VpcId"], + ) + igw = ec2_client.create_internet_gateway() + ec2_client.create_route( + DestinationCidrBlock="0.0.0.0", + RouteTableId=route_table_public["RouteTable"]["RouteTableId"], + GatewayId=igw["InternetGateway"]["InternetGatewayId"], + ) + ec2_client.associate_route_table( + RouteTableId=route_table_public["RouteTable"]["RouteTableId"], + SubnetId=subnet_public["Subnet"]["SubnetId"], + ) + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_subnet_separate_private_public.vpc_subnet_separate_private_public.vpc_client", + new=VPC(current_audit_info), + ): + from prowler.providers.aws.services.vpc.vpc_subnet_separate_private_public.vpc_subnet_separate_private_public import ( + vpc_subnet_separate_private_public, + ) + + check = vpc_subnet_separate_private_public() + results = check.execute() + + found = False + for result in results: + if result.resource_id == vpc["Vpc"]["VpcId"]: + found = True + assert result.status == "FAIL" + assert ( + result.status_extended + == f"VPC {vpc['Vpc']['VpcId']} has only public subnets." + ) + assert result.resource_id == vpc["Vpc"]["VpcId"] + assert result.resource_tags == [] + assert result.region == AWS_REGION + if not found: + assert False + + @mock_ec2 + def test_vpc_subnet_private_and_public(self): + ec2_client = client("ec2", region_name=AWS_REGION) + vpc = ec2_client.create_vpc( + CidrBlock="172.28.7.0/24", InstanceTenancy="default" + ) + # VPC Private + subnet_private = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.192/26", + AvailabilityZone=f"{AWS_REGION}a", + ) + route_table_private = ec2_client.create_route_table( + VpcId=vpc["Vpc"]["VpcId"], + ) + ec2_client.create_route( + DestinationCidrBlock="10.10.10.0", + RouteTableId=route_table_private["RouteTable"]["RouteTableId"], + ) + ec2_client.associate_route_table( + RouteTableId=route_table_private["RouteTable"]["RouteTableId"], + SubnetId=subnet_private["Subnet"]["SubnetId"], + ) + # VPC Public + subnet_public = ec2_client.create_subnet( + VpcId=vpc["Vpc"]["VpcId"], + CidrBlock="172.28.7.0/26", + AvailabilityZone=f"{AWS_REGION}a", + ) + route_table_public = ec2_client.create_route_table( + VpcId=vpc["Vpc"]["VpcId"], + ) + igw = ec2_client.create_internet_gateway() + ec2_client.create_route( + DestinationCidrBlock="0.0.0.0", + RouteTableId=route_table_public["RouteTable"]["RouteTableId"], + GatewayId=igw["InternetGateway"]["InternetGatewayId"], + ) + ec2_client.associate_route_table( + RouteTableId=route_table_public["RouteTable"]["RouteTableId"], + SubnetId=subnet_public["Subnet"]["SubnetId"], + ) + + from prowler.providers.aws.services.vpc.vpc_service import VPC + + current_audit_info = self.set_mocked_audit_info() + + with mock.patch( + "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", + new=current_audit_info, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_subnet_separate_private_public.vpc_subnet_separate_private_public.vpc_client", + new=VPC(current_audit_info), + ): + from prowler.providers.aws.services.vpc.vpc_subnet_separate_private_public.vpc_subnet_separate_private_public import ( + vpc_subnet_separate_private_public, + ) + + check = vpc_subnet_separate_private_public() + results = check.execute() + + found = False + for result in results: + if result.resource_id == vpc["Vpc"]["VpcId"]: + found = True + assert result.status == "PASS" + assert ( + result.status_extended + == f"VPC {vpc['Vpc']['VpcId']} has private and public subnets." + ) + assert result.resource_id == vpc["Vpc"]["VpcId"] + assert result.resource_tags == [] + assert result.region == AWS_REGION + if not found: + assert False