diff --git a/prowler/providers/aws/services/networkfirewall/__init__.py b/prowler/providers/aws/services/networkfirewall/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/networkfirewall/networkfirewall_client.py b/prowler/providers/aws/services/networkfirewall/networkfirewall_client.py new file mode 100644 index 00000000..685947ac --- /dev/null +++ b/prowler/providers/aws/services/networkfirewall/networkfirewall_client.py @@ -0,0 +1,6 @@ +from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info +from prowler.providers.aws.services.networkfirewall.networkfirewall_service import ( + NetworkFirewall, +) + +networkfirewall_client = NetworkFirewall(current_audit_info) diff --git a/prowler/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/__init__.py b/prowler/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prowler/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/networkfirewall_in_all_vpc.metadata.json b/prowler/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/networkfirewall_in_all_vpc.metadata.json new file mode 100644 index 00000000..c627fcf2 --- /dev/null +++ b/prowler/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/networkfirewall_in_all_vpc.metadata.json @@ -0,0 +1,30 @@ +{ + "Provider": "aws", + "CheckID": "networkfirewall_in_all_vpc", + "CheckTitle": "Ensure all VPCs have Network Firewall enabled", + "CheckType": [], + "ServiceName": "network-firewall", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:network-firewall::account-id:firewall/firewall-name", + "Severity": "medium", + "ResourceType": "Other", + "Description": "Ensure all VPCs have Network Firewall enabled", + "Risk": "Without a network firewall, it can be difficult to monitor and control traffic within the VPC. This can make it harder to detect and prevent attacks or unauthorized access to resources.", + "RelatedUrl": "https://docs.aws.amazon.com/network-firewall/latest/developerguide/setting-up.html", + "Remediation": { + "Code": { + "CLI": "aws network-firewall create-firewall --firewall-name --vpc-id ", + "NativeIaC": "", + "Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/NetworkFirewall/network-firewall-in-use.html", + "Terraform": "" + }, + "Recommendation": { + "Text": "Ensure all VPCs have Network Firewall enabled", + "Url": "https://docs.aws.amazon.com/network-firewall/latest/developerguide/vpc-config.html" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/networkfirewall_in_all_vpc.py b/prowler/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/networkfirewall_in_all_vpc.py new file mode 100644 index 00000000..7319f6cc --- /dev/null +++ b/prowler/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/networkfirewall_in_all_vpc.py @@ -0,0 +1,31 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.networkfirewall.networkfirewall_client import ( + networkfirewall_client, +) +from prowler.providers.aws.services.vpc.vpc_client import vpc_client + + +class networkfirewall_in_all_vpc(Check): + def execute(self): + findings = [] + for vpc in vpc_client.vpcs: + report = Check_Report_AWS(self.metadata()) + report.region = vpc.region + report.resource_id = vpc.id + report.resource_arn = "" + report.resource_tags = vpc.tags + report.status = "FAIL" + report.status_extended = ( + f"VPC {vpc.id} does not have Network Firewall enabled." + ) + for firewall in networkfirewall_client.network_firewalls: + if firewall.vpc_id == vpc.id: + report.status = "PASS" + report.status_extended = ( + f"VPC {vpc.id} has Network Firewall enabled." + ) + break + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/networkfirewall/networkfirewall_service.py b/prowler/providers/aws/services/networkfirewall/networkfirewall_service.py new file mode 100644 index 00000000..c6da6c64 --- /dev/null +++ b/prowler/providers/aws/services/networkfirewall/networkfirewall_service.py @@ -0,0 +1,94 @@ +import threading + +from pydantic import BaseModel + +from prowler.lib.logger import logger +from prowler.lib.scan_filters.scan_filters import is_resource_filtered +from prowler.providers.aws.aws_provider import generate_regional_clients + + +################## NetworkFirewall +class NetworkFirewall: + def __init__(self, audit_info): + self.service = "network-firewall" + self.session = audit_info.audit_session + self.audited_account = audit_info.audited_account + self.audited_partition = audit_info.audited_partition + self.audit_resources = audit_info.audit_resources + self.regional_clients = generate_regional_clients(self.service, audit_info) + # If the region is not set in the audit profile, + # we pick the first region from the regional clients list + self.region = ( + audit_info.profile_region + if audit_info.profile_region + else list(self.regional_clients.keys())[0] + ) + self.network_firewalls = [] + self.__threading_call__(self.__list_firewalls__) + self.__describe_firewall__() + + def __get_session__(self): + return self.session + + def __threading_call__(self, call): + threads = [] + for regional_client in self.regional_clients.values(): + threads.append(threading.Thread(target=call, args=(regional_client,))) + for t in threads: + t.start() + for t in threads: + t.join() + + def __list_firewalls__(self, regional_client): + logger.info("Network Firewall - Listing Network Firewalls...") + try: + list_network_firewalls_paginator = regional_client.get_paginator( + "list_firewalls" + ) + for page in list_network_firewalls_paginator.paginate(): + for network_firewall in page["Firewalls"]: + if not self.audit_resources or ( + is_resource_filtered( + network_firewall["FirewallArn"], self.audit_resources + ) + ): + self.network_firewalls.append( + Firewall( + arn=network_firewall.get("FirewallArn"), + region=regional_client.region, + name=network_firewall.get("FirewallName"), + ) + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def __describe_firewall__(self): + logger.info("Network Firewall - Describe Network Firewalls...") + try: + for network_firewall in self.network_firewalls: + regional_client = self.regional_clients[network_firewall.region] + describe_firewall = regional_client.describe_firewall( + FirewallArn=network_firewall.arn + )["Firewall"] + network_firewall.policy_arn = describe_firewall.get("FirewallPolicyArn") + network_firewall.vpc_id = describe_firewall.get("VpcId") + network_firewall.tags = describe_firewall.get("Tags") + network_firewall.encryption_type = describe_firewall.get( + "EncryptionConfiguration" + ).get("Type") + except Exception as error: + logger.error( + f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" + ) + + +class Firewall(BaseModel): + arn: str + name: str + region: str + policy_arn: str = None + vpc_id: str = None + tags: list = [] + encryption_type: str = None diff --git a/tests/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/networkfirewall_in_all_vpc_test.py b/tests/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/networkfirewall_in_all_vpc_test.py new file mode 100644 index 00000000..6f28f8ae --- /dev/null +++ b/tests/providers/aws/services/networkfirewall/networkfirewall_in_all_vpc/networkfirewall_in_all_vpc_test.py @@ -0,0 +1,257 @@ +from unittest import mock + +from prowler.providers.aws.services.networkfirewall.networkfirewall_service import ( + Firewall, +) +from prowler.providers.aws.services.vpc.vpc_service import VPCs, VpcSubnet + +AWS_REGION = "us-east-1" +FIREWALL_ARN = "arn:aws:network-firewall:us-east-1:123456789012:firewall/my-firewall" +FIREWALL_NAME = "my-firewall" +VPC_ID_PROTECTED = "vpc-12345678901234567" +VPC_ID_UNPROTECTED = "vpc-12345678901234568" +POLICY_ARN = "arn:aws:network-firewall:us-east-1:123456789012:firewall-policy/my-policy" + + +class Test_networkfirewall_in_all_vpc: + def test_no_vpcs(self): + networkfirewall_client = mock.MagicMock + networkfirewall_client.region = AWS_REGION + networkfirewall_client.network_firewalls = [] + vpc_client = mock.MagicMock + vpc_client.region = AWS_REGION + vpc_client.vpcs = [] + with mock.patch( + "prowler.providers.aws.services.networkfirewall.networkfirewall_service.NetworkFirewall", + new=networkfirewall_client, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_service.VPC", + new=vpc_client, + ): + # Test Check + from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import ( + networkfirewall_in_all_vpc, + ) + + check = networkfirewall_in_all_vpc() + result = check.execute() + + assert len(result) == 0 + + def test_vpcs_with_firewall_all(self): + networkfirewall_client = mock.MagicMock + networkfirewall_client.region = AWS_REGION + networkfirewall_client.network_firewalls = [ + Firewall( + arn=FIREWALL_ARN, + name=FIREWALL_NAME, + region=AWS_REGION, + policy_arn=POLICY_ARN, + vpc_id=VPC_ID_PROTECTED, + tags=[], + encryption_type="CUSTOMER_KMS", + ) + ] + vpc_client = mock.MagicMock + vpc_client.region = AWS_REGION + vpc_client.vpcs = [ + VPCs( + id=VPC_ID_PROTECTED, + default=False, + cidr_block="192.168.0.0/16", + flow_log=False, + region=AWS_REGION, + subnets=[ + VpcSubnet( + id="subnet-123456789", + default=False, + vpc_id=VPC_ID_PROTECTED, + cidr_block="192.168.0.0/24", + availability_zone="us-east-1a", + public=False, + region=AWS_REGION, + tags=[], + ) + ], + tags=[], + ) + ] + with mock.patch( + "prowler.providers.aws.services.networkfirewall.networkfirewall_service.NetworkFirewall", + new=networkfirewall_client, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_service.VPC", + new=vpc_client, + ): + # Test Check + from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import ( + networkfirewall_in_all_vpc, + ) + + check = networkfirewall_in_all_vpc() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"VPC {VPC_ID_PROTECTED} has Network Firewall enabled." + ) + assert result[0].region == AWS_REGION + assert result[0].resource_id == VPC_ID_PROTECTED + assert result[0].resource_tags == [] + assert result[0].resource_arn == "" + + def test_vpcs_without_firewall(self): + networkfirewall_client = mock.MagicMock + networkfirewall_client.region = AWS_REGION + networkfirewall_client.network_firewalls = [] + vpc_client = mock.MagicMock + vpc_client.region = AWS_REGION + vpc_client.vpcs = [ + VPCs( + id=VPC_ID_UNPROTECTED, + default=False, + cidr_block="192.168.0.0/16", + flow_log=False, + region=AWS_REGION, + subnets=[ + VpcSubnet( + id="subnet-123456789", + default=False, + vpc_id=VPC_ID_UNPROTECTED, + cidr_block="192.168.0.0/24", + availability_zone="us-east-1a", + public=False, + region=AWS_REGION, + tags=[], + ) + ], + tags=[], + ) + ] + with mock.patch( + "prowler.providers.aws.services.networkfirewall.networkfirewall_service.NetworkFirewall", + new=networkfirewall_client, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_service.VPC", + new=vpc_client, + ): + # Test Check + from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import ( + networkfirewall_in_all_vpc, + ) + + check = networkfirewall_in_all_vpc() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"VPC {VPC_ID_UNPROTECTED} does not have Network Firewall enabled." + ) + assert result[0].region == AWS_REGION + assert result[0].resource_id == VPC_ID_UNPROTECTED + assert result[0].resource_tags == [] + assert result[0].resource_arn == "" + + def test_vpcs_with_and_without_firewall(self): + networkfirewall_client = mock.MagicMock + networkfirewall_client.region = AWS_REGION + networkfirewall_client.network_firewalls = [ + Firewall( + arn=FIREWALL_ARN, + name=FIREWALL_NAME, + region=AWS_REGION, + policy_arn=POLICY_ARN, + vpc_id=VPC_ID_PROTECTED, + tags=[], + encryption_type="CUSTOMER_KMS", + ) + ] + vpc_client = mock.MagicMock + vpc_client.region = AWS_REGION + vpc_client.vpcs = [ + VPCs( + id=VPC_ID_UNPROTECTED, + default=False, + cidr_block="192.168.0.0/16", + flow_log=False, + region=AWS_REGION, + subnets=[ + VpcSubnet( + id="subnet-123456789", + default=False, + vpc_id=VPC_ID_UNPROTECTED, + cidr_block="192.168.0.0/24", + availability_zone="us-east-1a", + public=False, + region=AWS_REGION, + tags=[], + ) + ], + tags=[], + ), + VPCs( + id=VPC_ID_PROTECTED, + default=False, + cidr_block="192.168.0.0/16", + flow_log=False, + region=AWS_REGION, + subnets=[ + VpcSubnet( + id="subnet-123456789", + default=False, + vpc_id=VPC_ID_PROTECTED, + cidr_block="192.168.0.0/24", + availability_zone="us-east-1a", + public=False, + region=AWS_REGION, + tags=[], + ) + ], + tags=[], + ), + ] + with mock.patch( + "prowler.providers.aws.services.networkfirewall.networkfirewall_service.NetworkFirewall", + new=networkfirewall_client, + ): + with mock.patch( + "prowler.providers.aws.services.vpc.vpc_service.VPC", + new=vpc_client, + ): + # Test Check + from prowler.providers.aws.services.networkfirewall.networkfirewall_in_all_vpc.networkfirewall_in_all_vpc import ( + networkfirewall_in_all_vpc, + ) + + check = networkfirewall_in_all_vpc() + result = check.execute() + + assert len(result) == 2 + for r in result: + if r.resource_id == VPC_ID_PROTECTED: + assert r.status == "PASS" + assert ( + r.status_extended + == f"VPC {VPC_ID_PROTECTED} has Network Firewall enabled." + ) + assert r.region == AWS_REGION + assert r.resource_id == VPC_ID_PROTECTED + assert r.resource_tags == [] + assert r.resource_arn == "" + if r.resource_id == VPC_ID_UNPROTECTED: + assert r.status == "FAIL" + assert ( + r.status_extended + == f"VPC {VPC_ID_UNPROTECTED} does not have Network Firewall enabled." + ) + assert r.region == AWS_REGION + assert r.resource_id == VPC_ID_UNPROTECTED + assert r.resource_tags == [] + assert r.resource_arn == "" diff --git a/tests/providers/aws/services/networkfirewall/networkfirewall_service_test.py b/tests/providers/aws/services/networkfirewall/networkfirewall_service_test.py new file mode 100644 index 00000000..60bceaf0 --- /dev/null +++ b/tests/providers/aws/services/networkfirewall/networkfirewall_service_test.py @@ -0,0 +1,124 @@ +from unittest.mock import patch + +import botocore +from boto3 import session + +from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info +from prowler.providers.aws.services.networkfirewall.networkfirewall_service import ( + NetworkFirewall, +) + +# Mock Test Region +AWS_REGION = "us-east-1" +FIREWALL_ARN = "arn:aws:network-firewall:us-east-1:123456789012:firewall/my-firewall" +FIREWALL_NAME = "my-firewall" +VPC_ID = "vpc-12345678901234567" +POLICY_ARN = "arn:aws:network-firewall:us-east-1:123456789012:firewall-policy/my-policy" + +# Mocking Calls +make_api_call = botocore.client.BaseClient._make_api_call + + +def mock_make_api_call(self, operation_name, kwargs): + """We have to mock every AWS API call using Boto3""" + if operation_name == "ListFirewalls": + return { + "Firewalls": [ + {"FirewallName": FIREWALL_NAME, "FirewallArn": FIREWALL_ARN}, + ] + } + if operation_name == "DescribeFirewall": + return { + "Firewall": { + "DeleteProtection": True, + "Description": "Description of the firewall", + "EncryptionConfiguration": { + "KeyId": "my-key-id", + "Type": "CUSTOMER_KMS", + }, + "FirewallArn": FIREWALL_ARN, + "FirewallId": "firewall-id", + "FirewallName": FIREWALL_NAME, + "FirewallPolicyArn": POLICY_ARN, + "FirewallPolicyChangeProtection": False, + "SubnetChangeProtection": False, + "SubnetMappings": [{"IPAddressType": "string", "SubnetId": "string"}], + "Tags": [{"Key": "test_tag", "Value": "test_value"}], + "VpcId": VPC_ID, + } + } + + return make_api_call(self, operation_name, kwargs) + + +def mock_generate_regional_clients(service, audit_info): + regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) + regional_client.region = AWS_REGION + return {AWS_REGION: regional_client} + + +# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +@patch( + "prowler.providers.aws.services.networkfirewall.networkfirewall_service.generate_regional_clients", + new=mock_generate_regional_clients, +) +class Test_NetworkFirewall_Service: + + # Mocked Audit Info + def set_mocked_audit_info(self): + audit_info = AWS_Audit_Info( + session_config=None, + original_session=None, + audit_session=session.Session( + profile_name=None, + botocore_session=None, + ), + audited_account=None, + audited_user_id=None, + audited_partition="aws", + audited_identity_arn=None, + profile=None, + profile_region=None, + credentials=None, + assumed_role_info=None, + audited_regions=None, + organizations_metadata=None, + audit_resources=None, + ) + return audit_info + + def test__get_client__(self): + audit_info = self.set_mocked_audit_info() + networkfirewall = NetworkFirewall(audit_info) + assert ( + networkfirewall.regional_clients[AWS_REGION].__class__.__name__ + == "NetworkFirewall" + ) + + def test__get_service__(self): + audit_info = self.set_mocked_audit_info() + networkfirewall = NetworkFirewall(audit_info) + assert networkfirewall.service == "network-firewall" + + def test__list_firewalls__(self): + audit_info = self.set_mocked_audit_info() + networkfirewall = NetworkFirewall(audit_info) + assert len(networkfirewall.network_firewalls) == 1 + assert networkfirewall.network_firewalls[0].arn == FIREWALL_ARN + assert networkfirewall.network_firewalls[0].region == AWS_REGION + assert networkfirewall.network_firewalls[0].name == FIREWALL_NAME + + def test__describe_firewall__(self): + audit_info = self.set_mocked_audit_info() + networkfirewall = NetworkFirewall(audit_info) + assert len(networkfirewall.network_firewalls) == 1 + assert networkfirewall.network_firewalls[0].arn == FIREWALL_ARN + assert networkfirewall.network_firewalls[0].region == AWS_REGION + assert networkfirewall.network_firewalls[0].name == FIREWALL_NAME + assert networkfirewall.network_firewalls[0].policy_arn == POLICY_ARN + assert networkfirewall.network_firewalls[0].vpc_id == VPC_ID + assert networkfirewall.network_firewalls[0].tags == [ + {"Key": "test_tag", "Value": "test_value"} + ] + assert networkfirewall.network_firewalls[0].encryption_type == "CUSTOMER_KMS"