test(audit_info): refactor dynamodb (#3129)

This commit is contained in:
Pepe Fagoaga
2023-12-05 10:59:26 +01:00
committed by GitHub
parent 4018221da6
commit b2e1eed684
4 changed files with 84 additions and 189 deletions

View File

@@ -1,53 +1,25 @@
from re import search
from unittest import mock
from boto3 import client, session
from boto3 import client
from moto import mock_dax
from moto.core import DEFAULT_ACCOUNT_ID
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.common.models import Audit_Metadata
AWS_REGION = "us-east-1"
from tests.providers.aws.audit_info_utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_audit_info,
)
class Test_dynamodb_accelerator_cluster_encryption_enabled:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=DEFAULT_ACCOUNT_ID,
audited_account_arn=f"arn:aws:iam::{DEFAULT_ACCOUNT_ID}:root",
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=["us-east-1", "eu-west-1"],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
@mock_dax
def test_dax_no_clusters(self):
from prowler.providers.aws.services.dynamodb.dynamodb_service import DAX
current_audit_info = self.set_mocked_audit_info()
current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
@@ -68,8 +40,8 @@ class Test_dynamodb_accelerator_cluster_encryption_enabled:
@mock_dax
def test_dax_cluster_no_encryption(self):
dax_client = client("dax", region_name=AWS_REGION)
iam_role_arn = f"arn:aws:iam::{DEFAULT_ACCOUNT_ID}:role/aws-service-role/dax.amazonaws.com/AWSServiceRoleForDAX"
dax_client = client("dax", region_name=AWS_REGION_US_EAST_1)
iam_role_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/aws-service-role/dax.amazonaws.com/AWSServiceRoleForDAX"
cluster = dax_client.create_cluster(
ClusterName="daxcluster",
NodeType="dax.t3.small",
@@ -78,7 +50,9 @@ class Test_dynamodb_accelerator_cluster_encryption_enabled:
)["Cluster"]
from prowler.providers.aws.services.dynamodb.dynamodb_service import DAX
current_audit_info = self.set_mocked_audit_info()
current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
@@ -103,13 +77,13 @@ class Test_dynamodb_accelerator_cluster_encryption_enabled:
)
assert result[0].resource_id == cluster["ClusterName"]
assert result[0].resource_arn == cluster["ClusterArn"]
assert result[0].region == AWS_REGION
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@mock_dax
def test_dax_cluster_with_encryption(self):
dax_client = client("dax", region_name=AWS_REGION)
iam_role_arn = f"arn:aws:iam::{DEFAULT_ACCOUNT_ID}:role/aws-service-role/dax.amazonaws.com/AWSServiceRoleForDAX"
dax_client = client("dax", region_name=AWS_REGION_US_EAST_1)
iam_role_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/aws-service-role/dax.amazonaws.com/AWSServiceRoleForDAX"
cluster = dax_client.create_cluster(
ClusterName="daxcluster",
NodeType="dax.t3.small",
@@ -119,7 +93,9 @@ class Test_dynamodb_accelerator_cluster_encryption_enabled:
)["Cluster"]
from prowler.providers.aws.services.dynamodb.dynamodb_service import DAX
current_audit_info = self.set_mocked_audit_info()
current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
@@ -141,5 +117,5 @@ class Test_dynamodb_accelerator_cluster_encryption_enabled:
assert search("has encryption at rest enabled", result[0].status_extended)
assert result[0].resource_id == cluster["ClusterName"]
assert result[0].resource_arn == cluster["ClusterArn"]
assert result[0].region == AWS_REGION
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []

View File

@@ -1,51 +1,20 @@
from boto3 import client, session
from boto3 import client
from moto import mock_dax, mock_dynamodb
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.dynamodb.dynamodb_service import DAX, DynamoDB
from prowler.providers.common.models import Audit_Metadata
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_REGION = "us-east-1"
from tests.providers.aws.audit_info_utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_audit_info,
)
class Test_DynamoDB_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root",
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
# Test Dynamo Service
@mock_dynamodb
def test_service(self):
# Dynamo client for this test class
audit_info = self.set_mocked_audit_info()
audit_info = set_mocked_aws_audit_info()
dynamodb = DynamoDB(audit_info)
assert dynamodb.service == "dynamodb"
@@ -53,7 +22,7 @@ class Test_DynamoDB_Service:
@mock_dynamodb
def test_client(self):
# Dynamo client for this test class
audit_info = self.set_mocked_audit_info()
audit_info = set_mocked_aws_audit_info()
dynamodb = DynamoDB(audit_info)
for regional_client in dynamodb.regional_clients.values():
assert regional_client.__class__.__name__ == "DynamoDB"
@@ -62,7 +31,7 @@ class Test_DynamoDB_Service:
@mock_dynamodb
def test__get_session__(self):
# Dynamo client for this test class
audit_info = self.set_mocked_audit_info()
audit_info = set_mocked_aws_audit_info()
dynamodb = DynamoDB(audit_info)
assert dynamodb.session.__class__.__name__ == "Session"
@@ -70,7 +39,7 @@ class Test_DynamoDB_Service:
@mock_dynamodb
def test_audited_account(self):
# Dynamo client for this test class
audit_info = self.set_mocked_audit_info()
audit_info = set_mocked_aws_audit_info()
dynamodb = DynamoDB(audit_info)
assert dynamodb.audited_account == AWS_ACCOUNT_NUMBER
@@ -78,7 +47,7 @@ class Test_DynamoDB_Service:
@mock_dynamodb
def test__list_tables__(self):
# Generate DynamoDB Client
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
dynamodb_client = client("dynamodb", region_name=AWS_REGION_US_EAST_1)
# Create DynamoDB Tables
dynamodb_client.create_table(
TableName="test1",
@@ -105,19 +74,19 @@ class Test_DynamoDB_Service:
BillingMode="PAY_PER_REQUEST",
)
# DynamoDB client for this test class
audit_info = self.set_mocked_audit_info()
audit_info = set_mocked_aws_audit_info()
dynamo = DynamoDB(audit_info)
assert len(dynamo.tables) == 2
assert dynamo.tables[0].name == "test1"
assert dynamo.tables[1].name == "test2"
assert dynamo.tables[0].region == AWS_REGION
assert dynamo.tables[1].region == AWS_REGION
assert dynamo.tables[0].region == AWS_REGION_US_EAST_1
assert dynamo.tables[1].region == AWS_REGION_US_EAST_1
# Test DynamoDB Describe Table
@mock_dynamodb
def test__describe_table__(self):
# Generate DynamoDB Client
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
dynamodb_client = client("dynamodb", region_name=AWS_REGION_US_EAST_1)
# Create DynamoDB Table
table = dynamodb_client.create_table(
TableName="test1",
@@ -135,12 +104,12 @@ class Test_DynamoDB_Service:
],
)["TableDescription"]
# DynamoDB client for this test class
audit_info = self.set_mocked_audit_info()
audit_info = set_mocked_aws_audit_info()
dynamo = DynamoDB(audit_info)
assert len(dynamo.tables) == 1
assert dynamo.tables[0].arn == table["TableArn"]
assert dynamo.tables[0].name == "test1"
assert dynamo.tables[0].region == AWS_REGION
assert dynamo.tables[0].region == AWS_REGION_US_EAST_1
assert dynamo.tables[0].tags == [
{"Key": "test", "Value": "test"},
]
@@ -149,7 +118,7 @@ class Test_DynamoDB_Service:
@mock_dynamodb
def test__describe_continuous_backups__(self):
# Generate DynamoDB Client
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
dynamodb_client = client("dynamodb", region_name=AWS_REGION_US_EAST_1)
# Create DynamoDB Table
table = dynamodb_client.create_table(
TableName="test1",
@@ -168,19 +137,19 @@ class Test_DynamoDB_Service:
PointInTimeRecoverySpecification={"PointInTimeRecoveryEnabled": True},
)
# DynamoDB client for this test class
audit_info = self.set_mocked_audit_info()
audit_info = set_mocked_aws_audit_info()
dynamo = DynamoDB(audit_info)
assert len(dynamo.tables) == 1
assert dynamo.tables[0].arn == table["TableArn"]
assert dynamo.tables[0].name == "test1"
assert dynamo.tables[0].pitr
assert dynamo.tables[0].region == AWS_REGION
assert dynamo.tables[0].region == AWS_REGION_US_EAST_1
# Test DAX Describe Clusters
@mock_dax
def test__describe_clusters__(self):
# Generate DAX Client
dax_client = client("dax", region_name=AWS_REGION)
dax_client = client("dax", region_name=AWS_REGION_US_EAST_1)
# Create DAX Clusters
iam_role_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/aws-service-role/dax.amazonaws.com/AWSServiceRoleForDAX"
dax_client.create_cluster(
@@ -204,19 +173,19 @@ class Test_DynamoDB_Service:
],
)
# DAX client for this test class
audit_info = self.set_mocked_audit_info()
audit_info = set_mocked_aws_audit_info()
dax = DAX(audit_info)
assert len(dax.clusters) == 2
assert dax.clusters[0].name == "daxcluster1"
assert dax.clusters[0].region == AWS_REGION
assert dax.clusters[0].region == AWS_REGION_US_EAST_1
assert dax.clusters[0].encryption
assert dax.clusters[0].tags == [
{"Key": "test", "Value": "test"},
]
assert dax.clusters[1].name == "daxcluster2"
assert dax.clusters[1].region == AWS_REGION
assert dax.clusters[1].region == AWS_REGION_US_EAST_1
assert dax.clusters[1].encryption
assert dax.clusters[1].tags == [
{"Key": "test", "Value": "test"},

View File

@@ -1,53 +1,24 @@
from re import search
from unittest import mock
from boto3 import client, session
from boto3 import client
from moto import mock_dynamodb
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.common.models import Audit_Metadata
AWS_REGION = "us-east-1"
AWS_ACCOUNT_NUMBER = "123456789012"
from tests.providers.aws.audit_info_utils import (
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_audit_info,
)
class Test_dynamodb_tables_kms_cmk_encryption_enabled:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root",
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=["us-east-1", "eu-west-1"],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
@mock_dynamodb
def test_dynamodb_no_tables(self):
from prowler.providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info = self.set_mocked_audit_info()
current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
@@ -68,7 +39,7 @@ class Test_dynamodb_tables_kms_cmk_encryption_enabled:
@mock_dynamodb
def test_dynamodb_table_kms_encryption(self):
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
dynamodb_client = client("dynamodb", region_name=AWS_REGION_US_EAST_1)
table = dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
@@ -84,7 +55,9 @@ class Test_dynamodb_tables_kms_cmk_encryption_enabled:
)["TableDescription"]
from prowler.providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info = self.set_mocked_audit_info()
current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
@@ -106,12 +79,12 @@ class Test_dynamodb_tables_kms_cmk_encryption_enabled:
assert search("KMS encryption enabled", result[0].status_extended)
assert result[0].resource_id == table["TableName"]
assert result[0].resource_arn == table["TableArn"]
assert result[0].region == AWS_REGION
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@mock_dynamodb
def test_dynamodb_table_default_encryption(self):
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
dynamodb_client = client("dynamodb", region_name=AWS_REGION_US_EAST_1)
table = dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
@@ -126,7 +99,9 @@ class Test_dynamodb_tables_kms_cmk_encryption_enabled:
)["TableDescription"]
from prowler.providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info = self.set_mocked_audit_info()
current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
@@ -148,5 +123,5 @@ class Test_dynamodb_tables_kms_cmk_encryption_enabled:
assert search("DEFAULT encryption enabled", result[0].status_extended)
assert result[0].resource_id == table["TableName"]
assert result[0].resource_arn == table["TableArn"]
assert result[0].region == AWS_REGION
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []

View File

@@ -1,53 +1,24 @@
from re import search
from unittest import mock
from boto3 import client, session
from boto3 import client
from moto import mock_dynamodb
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.common.models import Audit_Metadata
AWS_REGION = "us-east-1"
AWS_ACCOUNT_NUMBER = "123456789012"
from tests.providers.aws.audit_info_utils import (
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_audit_info,
)
class Test_dynamodb_tables_pitr_enabled:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_account_arn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root",
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=["us-east-1", "eu-west-1"],
organizations_metadata=None,
audit_resources=None,
mfa_enabled=False,
audit_metadata=Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
@mock_dynamodb
def test_dynamodb_no_tables(self):
from prowler.providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info = self.set_mocked_audit_info()
current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
@@ -68,7 +39,7 @@ class Test_dynamodb_tables_pitr_enabled:
@mock_dynamodb
def test_dynamodb_table_no_pitr(self):
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
dynamodb_client = client("dynamodb", region_name=AWS_REGION_US_EAST_1)
table = dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
@@ -83,7 +54,9 @@ class Test_dynamodb_tables_pitr_enabled:
)["TableDescription"]
from prowler.providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info = self.set_mocked_audit_info()
current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
@@ -108,12 +81,12 @@ class Test_dynamodb_tables_pitr_enabled:
)
assert result[0].resource_id == table["TableName"]
assert result[0].resource_arn == table["TableArn"]
assert result[0].region == AWS_REGION
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@mock_dynamodb
def test_dynamodb_table_with_pitr(self):
dynamodb_client = client("dynamodb", region_name=AWS_REGION)
dynamodb_client = client("dynamodb", region_name=AWS_REGION_US_EAST_1)
table = dynamodb_client.create_table(
TableName="test1",
AttributeDefinitions=[
@@ -132,7 +105,9 @@ class Test_dynamodb_tables_pitr_enabled:
)
from prowler.providers.aws.services.dynamodb.dynamodb_service import DynamoDB
current_audit_info = self.set_mocked_audit_info()
current_audit_info = set_mocked_aws_audit_info(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
@@ -156,5 +131,5 @@ class Test_dynamodb_tables_pitr_enabled:
)
assert result[0].resource_id == table["TableName"]
assert result[0].resource_arn == table["TableArn"]
assert result[0].region == AWS_REGION
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []