test(audit_info): refactor neptune (#3155)

This commit is contained in:
Nacho Rivera
2023-12-05 13:48:32 +01:00
committed by GitHub
parent dbc2c481dc
commit 0262f8757a
2 changed files with 52 additions and 110 deletions

View File

@@ -1,16 +1,18 @@
from unittest import mock from unittest import mock
from boto3 import client, session from boto3 import client
from mock import MagicMock, patch from mock import MagicMock, patch
from moto import mock_ec2, mock_neptune from moto import mock_ec2, mock_neptune
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.neptune.neptune_service import Neptune from prowler.providers.aws.services.neptune.neptune_service import Neptune
from prowler.providers.aws.services.vpc.vpc_service import VpcSubnet from prowler.providers.aws.services.vpc.vpc_service import VpcSubnet
from prowler.providers.common.models import Audit_Metadata from tests.providers.aws.audit_info_utils import (
AWS_REGION_US_EAST_1,
AWS_REGION_US_EAST_1_AZA,
AWS_REGION_US_EAST_1_AZB,
set_mocked_aws_audit_info,
)
from tests.providers.aws.services.neptune.neptune_service_test import ( from tests.providers.aws.services.neptune.neptune_service_test import (
AWS_REGION_AZ1,
AWS_REGION_AZ2,
NEPTUNE_CLUSTER_NAME, NEPTUNE_CLUSTER_NAME,
NEPTUNE_CLUSTER_TAGS, NEPTUNE_CLUSTER_TAGS,
NEPTUNE_ENGINE, NEPTUNE_ENGINE,
@@ -19,46 +21,12 @@ from tests.providers.aws.services.neptune.neptune_service_test import (
mock_make_api_call, mock_make_api_call,
) )
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
AWS_REGION = "us-east-1"
VPC_ID = "vpc-12345678901234567" VPC_ID = "vpc-12345678901234567"
# Patch every AWS call using Boto3 # Patch every AWS call using Boto3
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_neptune_cluster_uses_public_subnet: class Test_neptune_cluster_uses_public_subnet:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=AWS_ACCOUNT_ARN,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=[AWS_REGION],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
@mock_neptune @mock_neptune
@mock_ec2 @mock_ec2
def test_neptune_no_clusters(self): def test_neptune_no_clusters(self):
@@ -66,7 +34,7 @@ class Test_neptune_cluster_uses_public_subnet:
vpc_client = MagicMock vpc_client = MagicMock
vpc_client.vpc_subnets = {} vpc_client.vpc_subnets = {}
audit_info = self.set_mocked_audit_info() audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
@@ -98,10 +66,10 @@ class Test_neptune_cluster_uses_public_subnet:
default=False, default=False,
vpc_id=VPC_ID, vpc_id=VPC_ID,
cidr_block="192.168.0.0/24", cidr_block="192.168.0.0/24",
availability_zone=AWS_REGION_AZ1, availability_zone=AWS_REGION_US_EAST_1_AZA,
public=False, public=False,
nat_gateway=False, nat_gateway=False,
region=AWS_REGION, region=AWS_REGION_US_EAST_1,
tags=[], tags=[],
mapPublicIpOnLaunch=False, mapPublicIpOnLaunch=False,
) )
@@ -112,19 +80,19 @@ class Test_neptune_cluster_uses_public_subnet:
default=False, default=False,
vpc_id=VPC_ID, vpc_id=VPC_ID,
cidr_block="192.168.0.1/24", cidr_block="192.168.0.1/24",
availability_zone=AWS_REGION_AZ2, availability_zone=AWS_REGION_US_EAST_1_AZB,
public=False, public=False,
nat_gateway=False, nat_gateway=False,
region=AWS_REGION, region=AWS_REGION_US_EAST_1,
tags=[], tags=[],
mapPublicIpOnLaunch=False, mapPublicIpOnLaunch=False,
) )
# Neptune client # Neptune client
neptune_client = client("neptune", region_name=AWS_REGION) neptune_client = client("neptune", region_name=AWS_REGION_US_EAST_1)
# Create Neptune Cluster # Create Neptune Cluster
cluster = neptune_client.create_db_cluster( cluster = neptune_client.create_db_cluster(
AvailabilityZones=[AWS_REGION_AZ1, AWS_REGION_AZ2], AvailabilityZones=[AWS_REGION_US_EAST_1_AZA, AWS_REGION_US_EAST_1_AZB],
BackupRetentionPeriod=1, BackupRetentionPeriod=1,
CopyTagsToSnapshot=True, CopyTagsToSnapshot=True,
Engine=NEPTUNE_ENGINE, Engine=NEPTUNE_ENGINE,
@@ -139,7 +107,7 @@ class Test_neptune_cluster_uses_public_subnet:
cluster_arn = cluster["DBClusterArn"] cluster_arn = cluster["DBClusterArn"]
cluster_id = cluster["DbClusterResourceId"] cluster_id = cluster["DbClusterResourceId"]
audit_info = self.set_mocked_audit_info() audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info, new=audit_info,
@@ -162,7 +130,7 @@ class Test_neptune_cluster_uses_public_subnet:
result[0].status_extended result[0].status_extended
== f"Cluster {cluster_id} is not using public subnets." == f"Cluster {cluster_id} is not using public subnets."
) )
assert result[0].region == AWS_REGION assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == cluster_id assert result[0].resource_id == cluster_id
assert result[0].resource_arn == cluster_arn assert result[0].resource_arn == cluster_arn
assert result[0].resource_tags == NEPTUNE_CLUSTER_TAGS assert result[0].resource_tags == NEPTUNE_CLUSTER_TAGS
@@ -179,10 +147,10 @@ class Test_neptune_cluster_uses_public_subnet:
default=False, default=False,
vpc_id=VPC_ID, vpc_id=VPC_ID,
cidr_block="192.168.0.0/24", cidr_block="192.168.0.0/24",
availability_zone=AWS_REGION_AZ1, availability_zone=AWS_REGION_US_EAST_1_AZA,
public=True, public=True,
nat_gateway=False, nat_gateway=False,
region=AWS_REGION, region=AWS_REGION_US_EAST_1,
tags=[], tags=[],
mapPublicIpOnLaunch=False, mapPublicIpOnLaunch=False,
) )
@@ -193,19 +161,19 @@ class Test_neptune_cluster_uses_public_subnet:
default=False, default=False,
vpc_id=VPC_ID, vpc_id=VPC_ID,
cidr_block="192.168.0.1/24", cidr_block="192.168.0.1/24",
availability_zone=AWS_REGION_AZ2, availability_zone=AWS_REGION_US_EAST_1_AZB,
public=True, public=True,
nat_gateway=False, nat_gateway=False,
region=AWS_REGION, region=AWS_REGION_US_EAST_1,
tags=[], tags=[],
mapPublicIpOnLaunch=False, mapPublicIpOnLaunch=False,
) )
# Neptune client # Neptune client
neptune_client = client("neptune", region_name=AWS_REGION) neptune_client = client("neptune", region_name=AWS_REGION_US_EAST_1)
# Create Neptune Cluster # Create Neptune Cluster
cluster = neptune_client.create_db_cluster( cluster = neptune_client.create_db_cluster(
AvailabilityZones=[AWS_REGION_AZ1, AWS_REGION_AZ2], AvailabilityZones=[AWS_REGION_US_EAST_1_AZA, AWS_REGION_US_EAST_1_AZB],
BackupRetentionPeriod=1, BackupRetentionPeriod=1,
CopyTagsToSnapshot=True, CopyTagsToSnapshot=True,
Engine=NEPTUNE_ENGINE, Engine=NEPTUNE_ENGINE,
@@ -220,7 +188,7 @@ class Test_neptune_cluster_uses_public_subnet:
cluster_arn = cluster["DBClusterArn"] cluster_arn = cluster["DBClusterArn"]
cluster_id = cluster["DbClusterResourceId"] cluster_id = cluster["DbClusterResourceId"]
audit_info = self.set_mocked_audit_info() audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info", "prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info, new=audit_info,
@@ -243,7 +211,7 @@ class Test_neptune_cluster_uses_public_subnet:
result[0].status_extended result[0].status_extended
== f"Cluster {cluster_id} is using subnet-1, subnet-2 public subnets." == f"Cluster {cluster_id} is using subnet-1, subnet-2 public subnets."
) )
assert result[0].region == AWS_REGION assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == cluster_id assert result[0].resource_id == cluster_id
assert result[0].resource_arn == cluster_arn assert result[0].resource_arn == cluster_arn
assert result[0].resource_tags == NEPTUNE_CLUSTER_TAGS assert result[0].resource_tags == NEPTUNE_CLUSTER_TAGS

View File

@@ -1,18 +1,16 @@
import botocore import botocore
from boto3 import client, session from boto3 import client
from mock import patch from mock import patch
from moto import mock_neptune from moto import mock_neptune
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.neptune.neptune_service import Cluster, Neptune from prowler.providers.aws.services.neptune.neptune_service import Cluster, Neptune
from prowler.providers.common.models import Audit_Metadata from tests.providers.aws.audit_info_utils import (
AWS_ACCOUNT_NUMBER,
AWS_ACCOUNT_NUMBER = "123456789012" AWS_REGION_US_EAST_1,
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root" AWS_REGION_US_EAST_1_AZA,
AWS_REGION_US_EAST_1_AZB,
AWS_REGION = "us-east-1" set_mocked_aws_audit_info,
AWS_REGION_AZ1 = "us-east-1a" )
AWS_REGION_AZ2 = "us-east-b"
SUBNET_GROUP_NAME = "default" SUBNET_GROUP_NAME = "default"
SUBNET_1 = "subnet-1" SUBNET_1 = "subnet-1"
@@ -48,16 +46,20 @@ def mock_make_api_call(self, operation_name, kwargs):
"Subnets": [ "Subnets": [
{ {
"SubnetIdentifier": "subnet-1", "SubnetIdentifier": "subnet-1",
"SubnetAvailabilityZone": {"Name": AWS_REGION_AZ1}, "SubnetAvailabilityZone": {
"Name": AWS_REGION_US_EAST_1_AZA
},
"SubnetStatus": "Active", "SubnetStatus": "Active",
}, },
{ {
"SubnetIdentifier": "subnet-2", "SubnetIdentifier": "subnet-2",
"SubnetAvailabilityZone": {"Name": AWS_REGION_AZ2}, "SubnetAvailabilityZone": {
"Name": AWS_REGION_US_EAST_1_AZB
},
"SubnetStatus": "Active", "SubnetStatus": "Active",
}, },
], ],
"DBSubnetGroupArn": f"arn:aws:rds:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:subgrp:{SUBNET_GROUP_NAME}", "DBSubnetGroupArn": f"arn:aws:rds:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:subgrp:{SUBNET_GROUP_NAME}",
} }
] ]
} }
@@ -68,9 +70,11 @@ def mock_make_api_call(self, operation_name, kwargs):
def mock_generate_regional_clients(service, audit_info, _): def mock_generate_regional_clients(service, audit_info, _):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION) regional_client = audit_info.audit_session.client(
regional_client.region = AWS_REGION service, region_name=AWS_REGION_US_EAST_1
return {AWS_REGION: regional_client} )
regional_client.region = AWS_REGION_US_EAST_1
return {AWS_REGION_US_EAST_1: regional_client}
@patch( @patch(
@@ -80,62 +84,32 @@ def mock_generate_regional_clients(service, audit_info, _):
# Patch every AWS call using Boto3 # Patch every AWS call using Boto3
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_Neptune_Service: class Test_Neptune_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=AWS_ACCOUNT_ARN,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
# Test Neptune Service # Test Neptune Service
@mock_neptune @mock_neptune
def test_service(self): def test_service(self):
audit_info = self.set_mocked_audit_info() audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
neptune = Neptune(audit_info) neptune = Neptune(audit_info)
assert neptune.service == "neptune" assert neptune.service == "neptune"
# Test Neptune Client] # Test Neptune Client]
@mock_neptune @mock_neptune
def test_client(self): def test_client(self):
audit_info = self.set_mocked_audit_info() audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
neptune = Neptune(audit_info) neptune = Neptune(audit_info)
assert neptune.client.__class__.__name__ == "Neptune" assert neptune.client.__class__.__name__ == "Neptune"
# Test Neptune Session # Test Neptune Session
@mock_neptune @mock_neptune
def test__get_session__(self): def test__get_session__(self):
audit_info = self.set_mocked_audit_info() audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
neptune = Neptune(audit_info) neptune = Neptune(audit_info)
assert neptune.session.__class__.__name__ == "Session" assert neptune.session.__class__.__name__ == "Session"
# Test Neptune Session # Test Neptune Session
@mock_neptune @mock_neptune
def test_audited_account(self): def test_audited_account(self):
audit_info = self.set_mocked_audit_info() audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
neptune = Neptune(audit_info) neptune = Neptune(audit_info)
assert neptune.audited_account == AWS_ACCOUNT_NUMBER assert neptune.audited_account == AWS_ACCOUNT_NUMBER
@@ -143,10 +117,10 @@ class Test_Neptune_Service:
@mock_neptune @mock_neptune
def test_describe_db_clusters(self): def test_describe_db_clusters(self):
# Neptune client # Neptune client
neptune_client = client("neptune", region_name=AWS_REGION) neptune_client = client("neptune", region_name=AWS_REGION_US_EAST_1)
# Create Neptune Cluster # Create Neptune Cluster
cluster = neptune_client.create_db_cluster( cluster = neptune_client.create_db_cluster(
AvailabilityZones=[AWS_REGION_AZ1, AWS_REGION_AZ2], AvailabilityZones=[AWS_REGION_US_EAST_1_AZA, AWS_REGION_US_EAST_1_AZB],
BackupRetentionPeriod=1, BackupRetentionPeriod=1,
CopyTagsToSnapshot=True, CopyTagsToSnapshot=True,
Engine=NEPTUNE_ENGINE, Engine=NEPTUNE_ENGINE,
@@ -161,7 +135,7 @@ class Test_Neptune_Service:
cluster_arn = cluster["DBClusterArn"] cluster_arn = cluster["DBClusterArn"]
cluster_id = cluster["DbClusterResourceId"] cluster_id = cluster["DbClusterResourceId"]
audit_info = self.set_mocked_audit_info() audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
neptune = Neptune(audit_info) neptune = Neptune(audit_info)
assert len(neptune.clusters) == 1 assert len(neptune.clusters) == 1
@@ -170,7 +144,7 @@ class Test_Neptune_Service:
arn=cluster_arn, arn=cluster_arn,
name=NEPTUNE_CLUSTER_NAME, name=NEPTUNE_CLUSTER_NAME,
id=cluster_id, id=cluster_id,
region=AWS_REGION, region=AWS_REGION_US_EAST_1,
db_subnet_group_id=SUBNET_GROUP_NAME, db_subnet_group_id=SUBNET_GROUP_NAME,
subnets=[SUBNET_1, SUBNET_2], subnets=[SUBNET_1, SUBNET_2],
tags=NEPTUNE_CLUSTER_TAGS, tags=NEPTUNE_CLUSTER_TAGS,